# Иллюстрация загрузки и подготовки данных Netflix

### $\textbf{Содержание}$:

### $\textbf{I. Создание табличных данных на основании txt файлов}$
### Из записей txt файлов формируются в табличные данные csv формата с атрибутами:
#### - $\it{movie\_Id} \in \mathbb{N}$: идентификатор предложения; 
#### - $\it{user_\_i_d} \in \mathbb{N}$: идентификатор пользователя;
#### - $\it{rating} \in [1, 5]$: полезность предложения для пользователя;
#### - $\it{date} \in \mathbb{N}$: временная метка;
------


### $\textbf{II. Обработка фильмов}$
### Из названия фильмов генерируются следующие признаки:
#### - год выпуска фильма $\it{year} \in \mathbb{N}$;
### Из оценок, выставленных пользователями, генерируются следующие признаки:
#### - средняя оценка фильма $\it{rating\_avg} \in [0, 5]$;
#### - количество оценок фильма $\it{rating\_cnt} \in \mathbb{N} \cup \{0\}$;
---

### $\textbf{III. Обработка рейтингов}$
#### Никакие признаки на это этапе не генерируются;
#### Атрибут даты приводятся к формату timestamp;


In [None]:
import os
import sys
import pandas as pd
import numpy as np

import re
from datetime import datetime

import tqdm

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder\
    .appName("processingApp")\
    .config("spark.driver.memory", "8G")\
    .config("spark.executor.cores", "8G")\
    .config("spark.executor.memory", "2G")\
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
data_folder = r'./'
row_data_files = ['combined_data_' + str(i) + '.txt' for i in range(1,5)]

movie_titles_path = r'./movie_titles.csv'

save_file_name =  r"./data_clean/netflix_full.csv"

save_movies = r'./data_clean/movies.csv'
save_rating = r'./data_clean/rating.csv'

In [None]:
row_data_files = [os.path.join(data_folder, i) for i in row_data_files]

### I. Создание табличных данных на основании txt файлов

In [None]:
def row_data_ops(
    files: List[str],
    save_file_name: str
):
    """
    Creating table data from txt files
    :param files: txt files names
    :type files: list
    :param save_file_name: file name for saving
    :type save_file_name: str

    
    """
    for _, file_name in enumerate(files):
        df = spark.read.text(os.path.join(file_name))
        df = df.coalesce(1).withColumn("row_num", F.monotonically_increasing_id())

        df_partitions = df.select( F.col("row_num").alias("Id"), 
                                  F.regexp_extract(F.col("value"), r'\d+', 0).alias("Id_start") ).where( F.substring(F.col("value"), -1, 1)==":" )
        df_partitions = df_partitions.select( F.col("Id").cast('int'),
                              F.col("Id_start").cast('int'))

        df_rows = df.select( F.col("row_num").alias("Id"),
                             F.col("value") ).where( F.substring(F.col("value"), -1, 1)!=":" )
        df_rows = df_rows.select( F.col("Id"),
                                  F.regexp_extract(F.col("value"), r'(\d+),(\d+),(\d+-\d+-\d+)', 1).cast('int').alias("user_Id"),
                                  F.regexp_extract(F.col("value"), r'(\d+),(\d+),(\d+-\d+-\d+)', 2).cast('int').alias("rating"),
                                  F.to_date(F.regexp_extract(F.col("value"), r'(\d+),(\d+),(\d+-\d+-\d+)', 3), "yyyy-mm-dd").alias("date"))
        df_partitions2 = df_partitions.select( F.col("Id").alias("Id2"),
                                             F.col("Id_start").alias("Id_end"))
        df_indexes = df_partitions.join(df_partitions2, df_partitions2.Id_end - df_partitions.Id_start == 1, "left").select( 
            F.col('Id').alias('Idx_start'), 
            F.col('Id2').alias('Idx_stop'),
            F.col('Id_start').alias('Index'))

        df_result = df_rows.join(F.broadcast(df_indexes), (df_rows.Id > df_indexes.Idx_start) & ((df_rows.Id < df_indexes.Idx_stop) | (df_indexes.Idx_stop.isNull())), "inner").select(
            F.col('Index').alias('movie_Id'),
            F.col('user_Id'),
            F.col('rating'),
            F.col('date')
        ).distinct()
        
        if _ == 0:
            df_all_users = df_result
        else:
            df_all_users = df_all_users.union(df_result)
            
    df_all_users.write.csv(save_file_name)

In [None]:
row_data_ops(row_data_files, save_file_name)

### II. Обработка фильмов

In [1]:
def movies_ops(
    data_path, 
    movies_path, 
    movie_save_path
):
    
    """
    operate movies name
    :param data_path: path to netflix full file
    :type data_path: str
    :param movies_path: path to netflix movies file
    :type movies_path: str
    :param movie_save_path: file path to save clear netflix movies
    :type movie_save_path: str

    """
    
    df_all = pd.read_csv(data_path)  
    df_movies = pd.merge(df_all.groupby(by = ['movie_Id'], as_index=False).rating.count().rename(columns={'rating':'rating_cnt'}),
                    df_all.groupby(by = ['movie_Id'], as_index=False).rating.mean().rename(columns={'rating':'rating_avg'}), 
                    on = 'movie_Id', how = 'inner')
    
    with open(movies_path, 'r', encoding="ISO-8859-1") as f:
        file = f.read()
    file_arr = file.split('\n')
    
    file_arr_id = []
    file_arr_year = []
    file_arr_name = []

    file_arr_problem = []

    for i in file_arr:
        row = re.sub(r'^\s+', '', i)
        row = re.sub(r'\s+$', '', i)
        row_group = re.match(r'(\d+),(\d+),(.+)', row)
        if row_group != None:
            assert row == row_group.group(0)

            file_arr_id.append(int(row_group.group(1)))
            file_arr_year.append(int(row_group.group(2)))
            file_arr_name.append(row_group.group(3))

        else:
            file_arr_problem.append(row)

    
    df_names = pd.DataFrame({ 'movie_Id':file_arr_id, 'year':file_arr_year, 'title':file_arr_name })
    fill_na_year = ['2002', '2002', '2002', '1974', '1999', '1994', '1999']
    fill_na_name = []
    fill_na_id = []

    for i in range(len(file_arr_problem)-1):
        row_group = re.match(r'(\d+),(NULL),(.+)', file_arr_problem[i])

        fill_na_id.append(int(row_group.group(1)))
        fill_na_name.append(row_group.group(3))

    df_names = pd.concat([df_names, pd.DataFrame({ 'movie_Id':fill_na_id, 'year':fill_na_year, 'title':fill_na_name })])
    df_names.movie_Id = df_names.movie_Id.astype('int')
    df_names.year = df_names.year.astype('int')
    
    df_movies = pd.merge(df_movies, df_names, on = 'movie_Id', how = 'left')
    df_movies.reset_index(drop=True).to_csv(movie_save_path, index=False)

In [None]:
movies_ops(save_file_name, movie_titles_path, save_movies)

### III. Обработка рейтингов

In [2]:
def rating_op(
    data_path, 
    rating_save_path
):
    """
    operate ratings 
    :param data_path: path to netflix full file
    :type data_path: str
    :param rating_save_path: file path to save operated netflix ratings
    :type rating_save_path: str

    """
    
    df_rating = pd.read_csv(data_path) 
    df_rating['timestamp'] = df_rating.date.apply(lambda x: pd.to_datetime(x))
    df_rating['timestamp'] = df_rating.timestamp.apply(lambda x: x.timestamp())
    df_rating = df_rating[['movie_Id', 'user_Id', 'rating', 'timestamp']]
    
    df_rating.reset_index(drop=True).to_csv(rating_save_path, index=False)

In [None]:
rating_op(save_file_name, save_rating)