# <center> Data Analysis Project<br><br>Курс "Анализ данных в экономике и финансах"<br><br>Сбор и загрузка первичных данных: II ЭТАП</center>

In [1]:
import json
import numpy as np
import pandas as pd

In [2]:
# pip install mysql-connector-python

In [3]:
from getpass import getpass
from mysql.connector import connect, Error

## 📺Таблица Movies

**[Сбор данных при помощи API](https://colab.research.google.com/drive/17OMgnKrUu2EXH6cf8URhZZ_zlbKgS0Vj?usp=sharing)**
<br>
<br>Открываем собранные данные:
 1. список премьер в российских кинотеатрах с 2011 по 2022 гг.;
 2. основная информация по каждому из фильмов;
 3. информация о кассовых сборах и бюджете каждого из кинопроизведений.

In [4]:
with open('kinopoisk_premiers.json') as json_file:
    kinopoisk_premiers = json.load(json_file)

In [5]:
with open('kinopoisk_media_film_info.json') as json_file:
    kinopoisk_media_film_info = json.load(json_file)

In [6]:
with open('kinopoisk_media_box_office.json') as json_file:
    kinopoisk_media_box_office = json.load(json_file)

В читабельном варианте json-файл с информацией о кассовых сборах и бюджете представить без предварительной обработки невозможно, поэтому создаём функцию, которая будет доставать информацию:
1. Бюджет фильма (+ валюта бюджета);
2. Российские кассовые сборы (+ валюта российских кассовых сборов);
3. Мировые кассовые сборы (+ валюта мировых кассовых сборов).

In [7]:
def getFilmBoxOffice(json_file, box_office_list):
    for i in range(len(json_file)):
        d = {}
        budget = []
        currency_budget = []
        rus = []
        currency_rus = []
        world = []
        currency_world = []
        trial_list = list(json_file[i].values())[0]
        for k in range(len(trial_list)):
            if trial_list[k]['type'] == 'BUDGET':
                budget.append(trial_list[k]['amount'])
                currency_budget.append(trial_list[k]['currencyCode'])
            elif trial_list[k]['type'] == 'RUS':
                rus.append(trial_list[k]['amount'])
                currency_rus.append(trial_list[k]['currencyCode'])
            elif trial_list[k]['type'] == 'WORLD':
                world.append(trial_list[k]['amount'])
                currency_world.append(trial_list[k]['currencyCode'])
        d['kinopoiskId'] = int(list(json_file[i].keys())[0])
        if len(budget) == 0:
            d['budget'] = None
            d['currency_budget'] = None
        else:
            d['budget'] = budget[0]
            d['currency_budget'] = currency_budget[0]
        if len(rus) == 0:
            d['rus'] = None
            d['currency_rus'] = None
        else:
            d['rus'] = rus[0]
            d['currency_rus'] = currency_rus[0]
        if len(world) == 0:
            d['world'] = None
            d['currency_world'] = None
        else:
            d['world'] = world[0]
            d['currency_world'] = currency_world[0]
        box_office_list.append(d)

Вся информация при выполнении функции записывается в python-словарь, затем добавляется в общий список (таким образом принимая вид, схожий с json-файлом).

In [8]:
kinopoisk_media_box_office_tr = []
getFilmBoxOffice(kinopoisk_media_box_office, kinopoisk_media_box_office_tr)

Считываем все три файла.

In [9]:
kinopoisk_premiers_df = pd.json_normalize(kinopoisk_premiers)
kinopoisk_media_film_info_df = pd.json_normalize(kinopoisk_media_film_info)
kinopoisk_media_box_office_df = pd.json_normalize(kinopoisk_media_box_office_tr)

Для сопоставимости таблиц, меняем название колонки "filmId" на "kinopoiskId".

In [10]:
kinopoisk_media_film_info_df = kinopoisk_media_film_info_df.rename(columns={'filmId': 'kinopoiskId'})

Удаляем возможные дубликаты записей.

In [11]:
kinopoisk_premiers_df = kinopoisk_premiers_df.drop_duplicates(
    subset=['kinopoiskId', 'nameRu'], keep='first', ignore_index=True)
kinopoisk_media_film_info_df = kinopoisk_media_film_info_df.drop_duplicates(
    subset=['kinopoiskId', 'webUrl'], keep='first', ignore_index=True)
kinopoisk_media_box_office_df = kinopoisk_media_box_office_df.drop_duplicates(
    subset=['kinopoiskId', 'budget'], keep='first', ignore_index=True)

## 👔 Таблица Staff

Открываем файл с информацией о создателях каждого из фильмов.

In [12]:
with open('kinopoisk_media_staff.json') as json_file:
    kinopoisk_media_staff = json.load(json_file)

Так же, как и с кассовыми сборами и бюджетами, создаём функцию, которая будет доставать информацию об id, имени (в русском и английском варианте), наименование должности (в титрах). Причем выбираем первые (главные) имена для каждого фильма:

1. Режиссер;
2. Актер;
3. Продюсер;
4. Сценарист;
5. Оператор;
6. Монтажер.

In [13]:
def getFilmStaff(json_file, staff_list):
    for i in range(len(json_file)):
        d = {}
        
        director_id = []
        director_nameRu = []
        director_nameEn = []
        director_professionKey = []
        
        actor_id = []
        actor_nameRu = []
        actor_nameEn = []
        actor_professionKey = []
        
        producer_id = []
        producer_nameRu = []
        producer_nameEn = []
        producer_professionKey = []
        
        writer_id = []
        writer_nameRu = []
        writer_nameEn = []
        writer_professionKey = []
        
        operator_id = []
        operator_nameRu = []
        operator_nameEn = []
        operator_professionKey = []
        
        editor_id = []
        editor_nameRu = []
        editor_nameEn = []
        editor_professionKey = []
        
        trial_list = list(json_file[i].values())[0]
        for k in range(len(trial_list)):
            if trial_list[k]['professionKey'] == 'DIRECTOR' and len(director_id) == 0:
                director_id.append(trial_list[k]['staffId'])
                director_nameRu.append(trial_list[k]['nameRu'])
                director_nameEn.append(trial_list[k]['nameEn'])
                director_professionKey.append(trial_list[k]['professionKey'])
            elif trial_list[k]['professionKey'] == 'ACTOR' and len(actor_id) == 0:
                actor_id.append(trial_list[k]['staffId'])
                actor_nameRu.append(trial_list[k]['nameRu'])
                actor_nameEn.append(trial_list[k]['nameEn'])
                actor_professionKey.append(trial_list[k]['professionKey'])
            elif trial_list[k]['professionKey'] == 'PRODUCER' and len(producer_id) == 0:
                producer_id.append(trial_list[k]['staffId'])
                producer_nameRu.append(trial_list[k]['nameRu'])
                producer_nameEn.append(trial_list[k]['nameEn'])
                producer_professionKey.append(trial_list[k]['professionKey'])
            elif trial_list[k]['professionKey'] == 'WRITER' and len(writer_id) == 0:
                writer_id.append(trial_list[k]['staffId'])
                writer_nameRu.append(trial_list[k]['nameRu'])
                writer_nameEn.append(trial_list[k]['nameEn'])
                writer_professionKey.append(trial_list[k]['professionKey'])
            elif trial_list[k]['professionKey'] == 'OPERATOR' and len(operator_id) == 0:
                operator_id.append(trial_list[k]['staffId'])
                operator_nameRu.append(trial_list[k]['nameRu'])
                operator_nameEn.append(trial_list[k]['nameEn'])
                operator_professionKey.append(trial_list[k]['professionKey'])
            elif trial_list[k]['professionKey'] == 'EDITOR' and len(editor_id) == 0:
                editor_id.append(trial_list[k]['staffId'])
                editor_nameRu.append(trial_list[k]['nameRu'])
                editor_nameEn.append(trial_list[k]['nameEn'])
                editor_professionKey.append(trial_list[k]['professionKey'])
        d['kinopoiskId'] = int(list(json_file[i].keys())[0])
        
        staff = ['director', 'actor', 'producer', 'writer', 'operator', 'editor']
        
        trial_staff_dir = [director_id, director_nameRu, director_nameEn, director_professionKey]
        trial_staff_act = [actor_id, actor_nameRu, actor_nameEn, actor_professionKey]
        trial_staff_prod = [producer_id, producer_nameRu, producer_nameEn, producer_professionKey]
        trial_staff_writ = [writer_id, writer_nameRu, writer_nameEn, writer_professionKey]
        trial_staff_oper = [operator_id, operator_nameRu, operator_nameEn, operator_professionKey]
        trial_staff_edit = [editor_id, editor_nameRu, editor_nameEn, editor_professionKey]
        
        trial_staff_array = [trial_staff_dir, trial_staff_act, trial_staff_prod, 
                             trial_staff_writ, trial_staff_oper, trial_staff_edit]
        
        for s in range(len(staff)):
            if len(trial_staff_array[s][0]) == 0:
                d[str(staff[s]) + '_id'] = None
                d[str(staff[s]) + '_nameRu'] = None
                d[str(staff[s]) + '_nameEn'] = None
                d[str(staff[s]) + '_professionKey'] = None
            else:
                d[str(staff[s]) + '_id'] = trial_staff_array[s][0][0]
                d[str(staff[s]) + '_nameRu'] = trial_staff_array[s][1][0]
                d[str(staff[s]) + '_nameEn'] = trial_staff_array[s][2][0]
                d[str(staff[s]) + '_professionKey'] = trial_staff_array[s][3][0]
                
        staff_list.append(d)

Осущетслвяем функцию, записываем информацию в список таким образом, чтобы содержание приняло вид json-файла.

In [14]:
kinopoisk_media_staff_tr = []
getFilmStaff(kinopoisk_media_staff, kinopoisk_media_staff_tr)

Считываем список в формат датафрейм.

In [15]:
kinopoisk_media_staff_df = pd.json_normalize(kinopoisk_media_staff_tr)

Переводим тип признаков, содержащих id, в integer.

In [16]:
for s in ['director', 'actor', 'producer', 'writer', 'operator', 'editor']:
    kinopoisk_media_staff_df[s + '_id'] = pd.to_numeric(
        kinopoisk_media_staff_df[s + '_id'], errors='coerce'
    ).astype(pd.Int64Dtype())

In [17]:
kinopoisk_media_staff_df = kinopoisk_media_staff_df.drop_duplicates(
    subset=['kinopoiskId', 'actor_id'], keep='first', ignore_index=True)

Делаем временную таблицу только с id создателей каждого из фильмов.

In [18]:
kinopoisk_media_staff_df_1 = kinopoisk_media_staff_df[['kinopoiskId', 'director_id', 'actor_id', 'producer_id', 'writer_id', 'operator_id', 'editor_id']]

## 🔗 Соединение и перенос таблиц из Pandas в MySQL DB

Поэтапно соединяем таблицы между собой: сначала список кинопремьер с информацией о каждой из них.

In [19]:
kinopoisk_movies_merged = kinopoisk_premiers_df.merge(kinopoisk_media_film_info_df, how='left', on='kinopoiskId')

Удаляем дублирующиеся столбцы.

In [20]:
kinopoisk_movies_merged = kinopoisk_movies_merged.drop(['nameRu_y', 'nameEn_y', 'countries_y', 'genres_y',
                                                        'year_y', 'premiereRu_y', 'posterUrl_y', 'posterUrlPreview_y'], axis=1)

Переименовываем измененные названия столбцов.

In [21]:
kinopoisk_movies_merged = kinopoisk_media_film_info_df.rename(columns={'nameRu_x': 'nameRu',
                                                                       'nameEn_x': 'nameEn',
                                                                       'year_x': 'year',
                                                                       'posterUrl_x': 'posterUrl',
                                                                       'posterUrlPreview_x': 'posterUrlPreview',
                                                                       'countries_x': 'countries',
                                                                       'genres_x': 'genres',
                                                                       'premiereRu_x': 'premiereRu'})

Присоединяем: информацию о кассовых сборах и бюджете каждого из фильмов, id создателей (разделены по столбцам в зависимости от должности).

In [22]:
kinopoisk_movies_merged = kinopoisk_movies_merged.merge(kinopoisk_media_box_office_df, how='left', on='kinopoiskId')

In [23]:
kinopoisk_movies_merged = kinopoisk_movies_merged.merge(kinopoisk_media_staff_df_1, how='inner', on='kinopoiskId')

Некоторые данные смотрятся некрасиво (представлены в виде кортежа). Переведем эти признаки в строчный вид с разделителем в виде запятой с пробелом.

In [24]:
def convertValues(df, column):
#     ', '.join(sorted(set().union(*(d.values() for d in kinopoisk_movies_merged['countries'][1]))))
    df[column] = df[column].apply(lambda x: ', '.join(sorted(set().union(*(d.values() for d in x)))))

In [25]:
convertValues(kinopoisk_movies_merged, 'countries')
convertValues(kinopoisk_movies_merged, 'genres')

Формируем подключение к локальному серверу MySQL.

In [26]:
conn_data = connect(host="localhost",
                    user='root',
                    password='Kinopoisk231.')
cur_data = conn_data.cursor()

Создаём новую пустую базу данных фильмов Кинопоиска.

In [27]:
create_db_query = "CREATE DATABASE kinopoisk_movies"
cur_data.execute(create_db_query)

Обращаемся уже конкретно к созданной базе данных.

In [28]:
conn_data = connect(host="localhost",
                    user='root',
                    password='Kinopoisk231.',
                    database='kinopoisk_movies')
cur_data = conn_data.cursor()

Формируем новую таблицу с информацией о фильмах.

In [29]:
cur_data.execute('''
    CREATE TABLE IF NOT EXISTS movies (
        kinopoiskId     INTEGER UNIQUE NOT NULL PRIMARY KEY,
        nameRu          TEXT,
        nameEn          TEXT,
        year            INT,
        webUrl          TEXT NOT NULL,
        description     TEXT,
        ratingAgeLimits INT,
        countries       TEXT,
        genres          TEXT,
        filmLength      TEXT, 
        premiereRu      TEXT,
        premiereWorld   TEXT,
        distributors    TEXT,
        budget          FLOAT,
        currency_budget TEXT,
        rus             FLOAT,
        currency_rus    TEXT,
        world           FLOAT,
        currency_world  TEXT,
        director_id     INT,
        actor_id        INT,
        producer_id     INT,
        writer_id       INT,
        operator_id     INT,
        editor_id       INT
    );
''')
conn_data.commit()
# filmLength ПОСМОТРЕТЬ НА ЭТАПЕ ОБРАБОТКИ В ПАНДАС И КОНВЕРТИРОВАТЬ В МИНУТЫ

In [30]:
# create_db_query = "DROP TABLE movies"
# cur_data.execute(create_db_query)

Перенесём построчно данные из пандас датафрейма в MySQL таблицу "movies" при помощи функции.

In [31]:
def fillTableSQL(df, table):
    create_db_query = '''INSERT INTO {} ({})
                         VALUES ({})'''.format(table, ', '.join(df.columns), 
                                               ', '.join(['%s' for i in range(len(df.columns))]))
#     print(create_db_query)
    for r in range(df.shape[0]):
        values = tuple(np.where(pd.isnull(df.values[r]) == True, None, df.values[r]))
        cur_data.execute(create_db_query, values)

Финальная выборка данных:

In [32]:
final_df = kinopoisk_movies_merged[['kinopoiskId', 'nameRu', 'nameEn', 'year', 'webUrl', 'description', 'ratingAgeLimits',
                                    'countries', 'genres', 'filmLength', 'premiereRu', 'premiereWorld', 'distributors', 
                                    'budget', 'currency_budget', 'rus', 'currency_rus', 'world', 'currency_world',
                                    'director_id', 'actor_id', 'producer_id', 'writer_id', 'operator_id', 'editor_id']]

### 🧩 Заполнение таблицы Movies

In [33]:
fillTableSQL(final_df, 'movies')
conn_data.commit()

Переходим к созданию и заполнению второй таблицы MySQL - создатели фильмов ("staff").

In [34]:
staff = ['director', 'actor', 'producer', 'writer', 'operator', 'editor']
df_list = ['kinopoisk_media_staff_df_' + str(s) for s in range(len(staff))]

for s in range(len(staff)):
    df_list[s] = kinopoisk_media_staff_df[[staff[s] + '_id', staff[s] + '_nameRu', staff[s] + '_nameEn']]
    df_list[s] = df_list[s].rename(columns={staff[s] + '_id': 'staff_id',
                                            staff[s] + '_nameRu': 'nameRu',
                                            staff[s] + '_nameEn': 'nameEn'})

В Pandas создаём объединенную таблицу всех создателей фильмов.

In [35]:
staff_media_df = pd.concat([df_list[0], df_list[1], df_list[2],
                            df_list[3], df_list[4], df_list[5]], ignore_index=True)

In [36]:
staff_media_df = staff_media_df.drop_duplicates(
    subset=['staff_id', 'nameRu'], keep='first', ignore_index=True)
staff_media_df = staff_media_df.dropna()

Формируем новую таблицу в MySQL - "staff".

In [37]:
cur_data.execute('''
    CREATE TABLE IF NOT EXISTS staff (
        staff_id     INTEGER UNIQUE NOT NULL PRIMARY KEY,
        nameRu       TEXT,
        nameEn       TEXT
    );
''')
conn_data.commit()

In [38]:
# create_db_query = "DROP TABLE staff"
# cur_data.execute(create_db_query)

In [39]:
fillTableSQL(staff_media_df, 'staff')
conn_data.commit()

Добавляем связку - отношение id персоны из таблицы "staff" с id режиссеров, актеров, продюсеров, сценаристов, операторов и монтажеров из таблицы "movies".

In [40]:
for s in staff:
    create_db_query = '''ALTER TABLE movies
                         ADD CONSTRAINT FOREIGN KEY (`{}_id`)
                         REFERENCES staff (`staff_id`);'''.format(s)
    cur_data.execute(create_db_query)
conn_data.commit()

Выгружаем id создателей для дополнительной выгрузки данных из API.

In [41]:
np.savez("staff_id", staff_media_df['staff_id'].to_list())

### 💡 Добаление инфо о рейтингах Кинопоиск и о работниках

После дополнительной выгрузки информации о создателях фильмов, считываем файл и переводим его в удобный формат Pandas DataFrame.

In [42]:
with open('kinopoisk_media_staff_info.json') as json_file:
    kinopoisk_media_staff_info = json.load(json_file)

In [43]:
kinopoisk_media_staff_info_df = pd.json_normalize(kinopoisk_media_staff_info)

In [44]:
staff_info_df = kinopoisk_media_staff_info_df[['personId', 'webUrl', 'birthday', 'sex', 'birthplace', 'hasAwards', 'profession']]

Добавляем новые колонки в существующую таблицу "staff", а также создаем временную таблицу "staff_info", из которой будем переносить колонки.

In [45]:
create_db_query = '''ALTER TABLE staff
                     ADD COLUMN webUrl     TEXT,
                     ADD COLUMN birthday   TEXT,
                     ADD COLUMN sex        TEXT,
                     ADD COLUMN birthplace TEXT,
                     ADD COLUMN hasAwards  TEXT,
                     ADD COLUMN profession TEXT;
                     
                     CREATE TABLE IF NOT EXISTS staff_info (
                         personId       INT,
                         webUrl         TEXT,
                         birthday       TEXT,
                         sex            TEXT,
                         birthplace     TEXT,
                         hasAwards      TEXT,
                         profession     TEXT);'''
cur_data.execute(create_db_query)

In [46]:
conn_data = connect(host="localhost",
                    user='root',
                    password='Kinopoisk231.',
                    database='kinopoisk_movies')
cur_data = conn_data.cursor()

Заполняем временную таблицу.

In [47]:
fillTableSQL(staff_info_df, 'staff_info')
conn_data.commit()

Обновляем старую таблицу создателей переносом столбцов из новой.

In [48]:
create_db_query = '''UPDATE staff 
                     LEFT JOIN staff_info 
                     ON staff.staff_id = staff_info.personId
                     SET staff.webUrl = staff_info.webUrl,
                         staff.birthday = staff_info.birthday,
                         staff.sex = staff_info.sex,
                         staff.birthplace = staff_info.birthplace,
                         staff.hasAwards = staff_info.hasAwards,
                         staff.profession = staff_info.profession
                     ;'''
cur_data.execute(create_db_query)

Удаляем временную таблицу с информацией о создателях.

In [49]:
create_db_query = "DROP TABLE staff_info"
cur_data.execute(create_db_query)

In [50]:
conn_data.commit()

Выгружаем рейтинг Кинопоиска для каждого фильма.

In [51]:
def getRating(json_file, staff_list):
    for k in range(len(json_file)):
        for i in json_file[k]['films']:
            d = {}
            d['kinopoiskId'] = i['filmId']
            d['nameRu'] = i['nameRu']
            d['nameEn'] = i['nameEn']
            d['rating'] = i['rating']
            staff_list.append(d)

In [52]:
kinopoisk_media_ratings = []
getRating(kinopoisk_media_staff_info, kinopoisk_media_ratings)

In [53]:
kinopoisk_media_ratings_df = pd.json_normalize(kinopoisk_media_ratings)

In [54]:
kinopoisk_media_ratings_df = kinopoisk_media_ratings_df.drop_duplicates(
    subset=['kinopoiskId', 'nameRu'], keep='first', ignore_index=True)

Аналогично таблице создателей фильмов, создаем временную таблицу с рейтингами.

In [55]:
create_db_query = '''ALTER TABLE movies
                     ADD COLUMN ratingKinopoisk     FLOAT;
                     
                     CREATE TABLE IF NOT EXISTS rating_info (
                         kinopoiskId       INT,
                         nameRu       TEXT,
                         nameEn       TEXT,
                         rating       FLOAT);'''
cur_data.execute(create_db_query)

In [56]:
conn_data = connect(host="localhost",
                    user='root',
                    password='Kinopoisk231.',
                    database='kinopoisk_movies')
cur_data = conn_data.cursor()

На всякий случай обновляем общую таблицу Pandas с фильмами.

In [57]:
final_df = final_df.merge(kinopoisk_media_ratings_df[['kinopoiskId', 'rating']], how='left', on='kinopoiskId')

Заполняем временную таблицу с рейтингами.

In [58]:
fillTableSQL(final_df[['kinopoiskId', 'rating']], 'rating_info')
conn_data.commit()

Обновляем общую таблицу с фильмами столбцами из временной.

In [59]:
create_db_query = '''UPDATE movies 
                     LEFT JOIN rating_info 
                     ON movies.kinopoiskId = rating_info.kinopoiskId
                     SET movies.ratingKinopoisk = rating_info.rating
                     ;'''
cur_data.execute(create_db_query)

Удаляем временную таблицу с рейтингами.

In [60]:
create_db_query = "DROP TABLE rating_info" # movies, staff, staff_info, rating_info
cur_data.execute(create_db_query)

In [61]:
conn_data.commit()