In [46]:
import ast
import os
import re
from typing import Any

import joblib
import numpy as np
import pandas as pd
import yaml
from transliterate import translit

# Описание задачи

**Общая цель проекта:** Создание веб-приложения для анализа, обучения модели и предсказания сборов фильмов в США на основе данных, полученных с сайта Кинопоиск. Используется FastAPI для backend и Streamlit для frontend.

**Цель текущего ноутбука:** 
Реализовать функции для процесса предобработки входных данных и получения предсказаний.

# Импорт данных

In [3]:
config_path = '../config/params.yml'
config = yaml.load(open(config_path), Loader=yaml.FullLoader)

preproc = config['preprocessing']
evaluate = config['evaluate']

Описание полей:

- **movie_id** - уникальный идентификатор фильма.
- **movie_name** - название фильма.
- **year** - год выпуска фильма.
- **votes_kp, votes_imdb** - количество оценок фильма на Кинопоиске и IMDB.
- **rating_kp, rating_imdb** - рейтинги фильма на Кинопоиске и IMDB.
- **votes_filmCritics** - количество оценок кинокритиков в мире
- **rating_filmCritics** - рейтинг кинокритиков в мире
- **votes_await** - количество ожидающих фильм на Кинопоиске
- **movieLength** - длина фильма
- **ageRating, ratingMpaa** - возрастные рейтинги фильма
- **type** - тип картины (фильм/мультфильм)
- **genres** - жанры фильма
- **countries** - страны фильма
- **fees_usa** - сборы фильма в мире, США, России
- **fees_world_currency и т.д.** - валюта сборов фильма
- **budget** - бюджет и валюта бюджета фильма
- **videos_trailers_number** - количество трейлеров
- **Production** - студии производства
- **Special_effects** - студии спецэффектов
- **actor_metric** - метрики актеров
- **director_metric** - метрики режиссеров
- **writer_metric** - метрики сценристов

In [29]:
# Загрузка тестовых данных для предсказаний
df_test = pd.read_csv(evaluate['predict_path'], sep=preproc['data_separator'])
df_test[:5]

Unnamed: 0,movie_id,movie_name,year,votes_kp,votes_imdb,votes_filmCritics,votes_await,rating_kp,rating_imdb,rating_filmCritics,...,genres,countries,fees_usa,videos_trailers_number,budget,actor_metric,writer_metric,director_metric,Special_effects,Production
0,463860,Если я останусь,2014,65773.0,126041.0,142.0,14105.0,6.901,6.7,4.9,...,"['фантастика', 'фэнтези', 'драма', 'мелодрама'...",['США'],50474843,3.0,11000000.0,11315090.0,,20230.43,[],"['Metro-Goldwyn-Mayer (MGM)', 'New Line Cinema']"
1,693962,Герой,2012,2520.0,3597.0,11.0,30.0,6.147,6.1,5.1,...,"['фэнтези', 'боевик', 'приключения']",['Китай'],35067,1.0,40000000.0,545390.2,60063.3,,[],[]
2,51319,Милые кости,2009,190210.0,174447.0,246.0,12249.0,7.143,6.6,5.0,...,"['фэнтези', 'триллер', 'драма', 'криминал', 'д...","['США', 'Великобритания', 'Новая Зеландия']",44114232,1.0,65000000.0,12472370.0,21519240.0,9924511.0,['Industrial Light & Magic (ILM)'],[]
3,408876,Хоббит: Пустошь Смауга,2013,434943.0,680751.0,254.0,102689.0,8.02,7.8,6.8,...,"['фэнтези', 'приключения', 'боевик']","['Новая Зеландия', 'США']",258387334,5.0,225000000.0,35344860.0,33574010.0,13624440.0,[],"['Metro-Goldwyn-Mayer (MGM)', 'New Line Cinema']"
4,470769,Цветок пустыни,2009,38628.0,13745.0,18.0,5.0,7.968,7.3,6.2,...,"['драма', 'биография']","['Великобритания', 'Германия', 'Австрия', 'Фра...",44348,1.0,,4050092.0,,,[],[]


In [32]:
df_test.shape

(964, 24)

In [33]:
df_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 964 entries, 0 to 963
Data columns (total 24 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   movie_id                964 non-null    int64  
 1   movie_name              964 non-null    object 
 2   year                    964 non-null    int64  
 3   votes_kp                964 non-null    float64
 4   votes_imdb              964 non-null    float64
 5   votes_filmCritics       904 non-null    float64
 6   votes_await             562 non-null    float64
 7   rating_kp               964 non-null    float64
 8   rating_imdb             964 non-null    float64
 9   rating_filmCritics      903 non-null    float64
 10  movieLength             964 non-null    float64
 11  ageRating               905 non-null    float64
 12  ratingMpaa              964 non-null    object 
 13  type                    964 non-null    object 
 14  genres                  964 non-null    ob

In [12]:
def open_json(name: str) -> Any:
    """
    Открытие и чтение содержимого JSON файла
    :param name: путь к JSON файлу для чтения
    :return: содержимое JSON файла
    """
    with open(name, 'r', encoding='utf-8') as file:
        return json.load(file)


def save_json(name: str, json_data: Any) -> None:
    """
    Сохранение данных в формате JSON в файл
    :param name: путь к файлу для сохранения данных
    :param json_data: данные для сохранения в формате JSON
    """
    with open(name, 'w', encoding='utf-8') as file:
        json.dump(json_data, file, indent=4, ensure_ascii=False)

# Получение бинаризованных колонок

In [7]:
def series_to_type_list(df: pd.DataFrame, columns: list) -> pd.DataFrame:
    """Преобразование колонок в тип список"""
    for col in columns:
        df[col] = df[col].fillna('[]')
        df[col] = df[col].apply(
            lambda x: x if isinstance(x, list) else ast.literal_eval(x))

    return df


def make_dummies(df: pd.DataFrame, col: str, p: float = None) -> pd.DataFrame:
    """
    Создает бинаризованные колонки для указанного признака в dataframe.
    Фильтрует колонки по заданному порогу дисперсии, если необходимо.
    """
    dummies = df[col].str.join('|').str.get_dummies(sep='|')

    if p is not None:
        # Расчет частоты каждой категории
        freq = dummies.sum() / len(dummies)

        # Фильтрация категорий, которые встречаются чаще, чем заданный порог
        filtered_categories = freq[freq > p].index
        dummies = dummies[filtered_categories]

    return dummies


def transform_columns_names(df: pd.DataFrame) -> pd.DataFrame:
    """Пайплайн преобразования названий колонок"""

    # Перевод названий колонок в латинские буквы
    df.columns = [
        translit(x, language_code='ru', reversed=True) for x in df.columns
    ]
    # Замена пробелов в названиях колонок на нижние подчеркивания
    df.columns = [col.replace(' ', '_') for col in df.columns]

    # Фильтарция названий колонок.
    # Остаются только нижние подчеркивания, англ. буквы и цифры
    df = df.rename(columns=lambda x: re.sub('[^A-Za-z0-9_]+', '', x))

    return df


def check_dummies_columns_evaluate(data: pd.DataFrame,
                                   train_sequence_path: str) -> pd.DataFrame:
    """
    Добавление недостающих признаков и упорядочивание согласно train
    :param data: датасет test
    :param unique_values_path: путь до списка с признаками train для сравнения
    :return: датасет test
    """

    train_sequence = open_json(train_sequence_path)
    data_sequence = data.columns

    # Поиск недостающих бинаризованных колонок
    missing_columns = set(train_sequence) - set(data_sequence)

    for col in missing_columns:
        data[col] = 0

    return data[train_sequence]


def preprocess_dummies(df: pd.DataFrame,
                       flg_evaluate: bool = True,
                       **kwargs) -> pd.DataFrame:
    """
    Создание и проверка бинаризованных колонок
    :param df: датасет
    :param flg_evaluate: флаг для evaluate
    :return: датасет с бинаризованными данными
    """

    dummy_columns = kwargs['dummies']['columns']

    assert all(col in df.columns for col in dummy_columns), \
        "Не все требуемые колонки для бинаризации присутствуют в данных"

    # Преобразование колонок в тип список для бинаризации
    df = series_to_type_list(df, dummy_columns)
    all_dummies = pd.DataFrame()

    for col in dummy_columns:
        dummies = make_dummies(df, col, kwargs['variance_threshold'])

        if not flg_evaluate:
            # Сохранение списка оригинальных названий колонок для UI
            sorted_counts = dummies.sum().sort_values(ascending=False)
            dummies = dummies[sorted_counts.index]

            original_dummies_dict = {col: dummies.columns.tolist()}
            save_json(kwargs['dummies'][col], original_dummies_dict)

        all_dummies = pd.concat([all_dummies, dummies], axis=1)

    # Преобразование названий колонок для LightGBM
    all_dummies = transform_columns_names(all_dummies)

    if flg_evaluate:
        # Проверка на соответствие бинаризованных колонок
        all_dummies = check_dummies_columns_evaluate(
            all_dummies, kwargs['dummies']['sequence_path'])
    else:
        # Сохраняем порядок бинаризованных колонок для обучения
        save_json(kwargs['dummies']['sequence_path'],
                  all_dummies.columns.tolist())

    return all_dummies

# Преобразование данных без бинаризации

In [8]:
def transform_types(data: pd.DataFrame,
                    change_type_columns: dict) -> pd.DataFrame:
    """
    Преобразование признаков в заданный тип данных
    :param data: датасет
    :param change_type_columns: словарь с признаками и типами данных
    :return:
    """
    return data.astype(change_type_columns, errors="raise")


def check_columns_evaluate(data: pd.DataFrame,
                           unique_values_path: str) -> pd.DataFrame:
    """
    Проверка на наличие признаков из train и упорядочивание признаков согласно train
    :param data: датасет test
    :param unique_values_path: путь до списока с признаками train для сравнения
    :return: датасет test
    """

    unique_values = open_json(unique_values_path)
    column_sequence = unique_values.keys()

    assert set(column_sequence) == set(data.columns), "Разные признаки"
    return data[column_sequence]


def save_unique_train_data(data: pd.DataFrame, drop_columns: list,
                           target_column: str,
                           unique_values_path: str) -> None:
    """
    Сохранение словаря с признаками и уникальными значениями
    :param data: датасет
    :param drop_columns: список с признаками для удаления
    :param target_column: целевая переменная
    :param unique_values_path: путь до файла со словарем
    :return: None
    """
    data = data.drop(columns=drop_columns + [target_column],
                     axis=1,
                     errors="ignore")

    # создаем словарь с уникальными значениями для вывода в UI
    dict_unique = {
        key: data[key].dropna().unique().tolist()
        for key in data.columns
    }

    save_json(unique_values_path, dict_unique)


def preprocess_data(df: pd.DataFrame,
                    flg_evaluate: bool = True,
                    **kwargs) -> pd.DataFrame:
    """
    Пайплайн по предобработке данных
    :param df: датасет
    :param flg_evaluate: флаг для evaluate
    :return: датасет
    """
    
    df = df.drop(kwargs['drop_columns'] + [kwargs['target_column']],
                 axis=1,
                 errors="ignore")
    # проверка dataset на совпадение с признаками из train
    # либо сохранение уникальных данных с признаками из train
    if flg_evaluate:
        df = check_columns_evaluate(
            data=df, unique_values_path=kwargs["unique_values_path"])
    else:
        save_unique_train_data(
            data=df,
            drop_columns=kwargs["drop_columns"],
            target_column=kwargs["target_column"],
            unique_values_path=kwargs["unique_values_path"],
        )

    df['movie_age'] = kwargs['current_year'] - df[kwargs['year_column']]
    df = df.drop(kwargs['year_column'], axis=1)

    # Преобразование типа колонок
    df = transform_types(data=df,
                         change_type_columns=kwargs["change_type_columns"])

    return df

# Получение данных для обучения

In [36]:
# Пример получения данных для модели
# Далее весь процесс будет осуществляться в функции pipeline_evaluate
dummies = preprocess_dummies(df_test, **preproc)
data = preprocess_data(df_test, evaluate=False, **preproc)
data = pd.concat([data, dummies], axis=1)

In [37]:
data

Unnamed: 0,votes_kp,votes_imdb,votes_filmCritics,votes_await,rating_kp,rating_imdb,rating_filmCritics,movieLength,ageRating,ratingMpaa,...,Millennium_Films,NBC_Universal_Television,New_Line_Cinema,Paramount_Pictures,Relativity_Media,Silver_Pictures,Studio_Canal,Universal_Pictures,Village_Roadshow_Pictures,Warner_Bros_Pictures
0,65773.0,126041.0,142.0,14105.0,6.901,6.7,4.9,107.0,16.0,pg13,...,0,0,1,0,0,0,0,0,0,0
1,2520.0,3597.0,11.0,30.0,6.147,6.1,5.1,98.0,16.0,undefined,...,0,0,0,0,0,0,0,0,0,0
2,190210.0,174447.0,246.0,12249.0,7.143,6.6,5.0,135.0,16.0,pg13,...,0,0,0,0,0,0,0,0,0,0
3,434943.0,680751.0,254.0,102689.0,8.020,7.8,6.8,161.0,12.0,pg13,...,0,0,1,0,0,0,0,0,0,0
4,38628.0,13745.0,18.0,5.0,7.968,7.3,6.2,120.0,18.0,r,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
959,6943.0,132411.0,198.0,,6.458,6.9,6.9,128.0,16.0,r,...,0,0,0,0,0,0,0,0,0,0
960,10762.0,36795.0,49.0,2502.0,4.713,5.0,5.2,80.0,18.0,r,...,0,0,0,0,0,0,0,0,0,0
961,2266.0,7013.0,,,7.461,7.4,,95.0,,r,...,0,0,0,0,0,0,0,0,0,0
962,4532.0,8295.0,65.0,,7.311,7.2,7.0,90.0,16.0,undefined,...,0,0,0,0,0,1,0,0,0,0


# Pipeline evaluate

In [44]:
def pipeline_evaluate(
        config_path, dataset: pd.DataFrame = None, data_path: str = None
) -> list:
    """
    Предобработка входных данных и получение предсказаний
    :param dataset: датасет
    :param config_path: путь до конфигурационного файла
    :param data_path: путь до файла с данными
    :return: предсказания
    """
    with open(config_path) as file:
        config = yaml.load(file, Loader=yaml.FullLoader)

    preproc_config = config["preprocessing"]
    train_config = config["train"]
    
    if data_path:
        dataset = pd.read_csv(data_path, sep=preproc_config['data_separator'])
    
    # Преобразование данных
    dummies = preprocess_dummies(dataset, **preproc_config)
    data = preprocess_data(dataset, **preproc_config)
    data = pd.concat([data, dummies], axis=1)
    
    # импорт модели из конфигурационного файла
    model = joblib.load(os.path.join(train_config["model_path"]))
    prediction = model.predict(data)
    
    # Преобразование обратно из логарифмической шкалы
    if preproc_config['log_target']:
        prediction = np.expm1(prediction)

    return prediction.tolist()

In [47]:
result = pipeline_evaluate(config_path=config_path,
                           data_path=evaluate['predict_path'])

In [51]:
# Добавление колонки с предскзанными сборами
df_test['predicted_fees_usa'] = result
df_test[['movie_name', 'predicted_fees_usa']]

Unnamed: 0,movie_name,predeicted_fees_usa
0,Если я останусь,1.927861e+07
1,Герой,1.508542e+05
2,Милые кости,3.285907e+07
3,Хоббит: Пустошь Смауга,2.465977e+08
4,Цветок пустыни,2.527837e+04
...,...,...
959,Сириана,2.941903e+07
960,Репортаж со свадьбы,4.148624e+05
961,Двенадцатилетние,2.858044e+05
962,Полночный поцелуй,1.332649e+05
