# Feature Engineering for Deep Learning
### 数据预处理：分别对数值型和类别型数据进行encode

In [22]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [23]:
spark = SparkSession \
    .builder \
    .appName("concrec-rank") \
    .config("spark.driver.memory", "11g") \
    .getOrCreate()

## Load Data

In [24]:
anime_df = spark.read.csv('../../data/anime/parsed_anime.csv', header=True, inferSchema=True)

In [25]:
anime_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: string (nullable = true)
 |-- aired_to: integer (nullable = true)



In [26]:
# cast aired_from into int
from pyspark.sql.types import IntegerType
anime_df = anime_df.withColumn('aired_from', col('aired_from').cast('int'))

In [27]:
anime_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)



In [28]:
anime_df.show(5)

+--------+--------------------+--------------------+-----+--------+------+-------+--------------------------+--------------------+--------------------+----------+----------+
|anime_id|                name|               genre| type|episodes|rating|members|            japanese_title|               aired|           image_url|aired_from|  aired_to|
+--------+--------------------+--------------------+-----+--------+------+-------+--------------------------+--------------------+--------------------+----------+----------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|                君の名は。|        Aug 26, 2016|https://cdn.myani...|1472140800|1472140800|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|  9.26| 793665|鋼の錬金術師 FULLMETAL ...|Apr 5, 2009 to Ju...|https://cdn.myani...|1238860800|1278172800|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|  9.25| 114262|                     銀魂°|Apr 8, 2015 to Ma...|https://cdn.m

In [29]:
rating_df = spark.read.csv('../../data/anime/rating.csv', header=True, inferSchema=True)

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)
                                                                                

In [30]:
rating_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [31]:
# valid rating only
rating_df = rating_df.filter(rating_df['rating'] > 0)

In [32]:
rating_df.show(5)

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|    8074|    10|
|      1|   11617|    10|
|      1|   11757|    10|
|      1|   15451|    10|
|      2|   11771|    10|
+-------+--------+------+
only showing top 5 rows



## Merge rating with anime

In [33]:
merged_df = rating_df.join(
    anime_df.select('anime_id', 'name', 'genre', 'type', 'episodes', 
                    'rating', 'members', 'aired_from', 'aired_to').withColumnRenamed('rating', 'all_rating'), 
    on=['anime_id'], how='left'
)

In [34]:
merged_df.show(5)

+--------+-------+------+--------------------+--------------------+----+--------+----------+-------+----------+----------+
|anime_id|user_id|rating|                name|               genre|type|episodes|all_rating|members|aired_from|  aired_to|
+--------+-------+------+--------------------+--------------------+----+--------+----------+-------+----------+----------+
|    8074|      1|    10|Highschool of the...|Action, Ecchi, Ho...|  TV|      12|      7.46| 535892|1278259200|1284912000|
|   11617|      1|    10|     High School DxD|Comedy, Demons, E...|  TV|      12|       7.7| 398660|1325779200|1332432000|
|   11757|      1|    10|    Sword Art Online|Action, Adventure...|  TV|      25|      7.83| 893100|1341676800|1356192000|
|   15451|      1|    10| High School DxD New|Action, Comedy, D...|  TV|      12|      7.87| 266657|1373126400|1379779200|
|   11771|      2|    10|    Kuroko no Basket|Comedy, School, S...|  TV|      25|      8.46| 338315|1333814400|1348243200|
+--------+------

## Build Label

把用户的打分转换成是否喜欢：大于7.5分为喜欢（1），否则为不喜欢（0）        
可以尝试不同策略，如直接用评分值

In [35]:
like_threshold = 7.5

def build_label(df):
    return df.withColumn('label',
                         when(col('rating') >= like_threshold, 1).otherwise(0)
                        )

In [36]:
labeled_df = build_label(merged_df)
labeled_df.show(5)

+--------+-------+------+--------------------+--------------------+----+--------+----------+-------+----------+----------+-----+
|anime_id|user_id|rating|                name|               genre|type|episodes|all_rating|members|aired_from|  aired_to|label|
+--------+-------+------+--------------------+--------------------+----+--------+----------+-------+----------+----------+-----+
|    8074|      1|    10|Highschool of the...|Action, Ecchi, Ho...|  TV|      12|      7.46| 535892|1278259200|1284912000|    1|
|   11617|      1|    10|     High School DxD|Comedy, Demons, E...|  TV|      12|       7.7| 398660|1325779200|1332432000|    1|
|   11757|      1|    10|    Sword Art Online|Action, Adventure...|  TV|      25|      7.83| 893100|1341676800|1356192000|    1|
|   15451|      1|    10| High School DxD New|Action, Comedy, D...|  TV|      12|      7.87| 266657|1373126400|1379779200|    1|
|   11771|      2|    10|    Kuroko no Basket|Comedy, School, S...|  TV|      25|      8.46| 3383

## Sliding Window
这里要在df的每个row上，额外增加和用户相关的信息。比如该用户最爱的电影类型、该用户看过多少电影、平均打分是多少        
为了防止泄露未来信息，需把所有评分按照时间顺序排序，然后用滑动窗口聚合        
理论应该使用评分时间，但是由于没有这个数据，所以采用电影上映时间

In [37]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pyspark.sql.types as types

In [38]:
windowSpec = Window \
    .partitionBy('user_id') \
    .orderBy('aired_from') \
    .rowsBetween(-100, -1)

In [39]:
# 帮助方法：对于某一列，在聚合的时候，如果用户不喜欢这个电影，则不聚合这个电影的信息
likedMoviesCol = lambda cname: when(col('label') == 1, col(cname)).otherwise(lit(None))

@udf(types.ArrayType(types.StringType()))
def most_liked_genres(gen_strs):
    """
    gen_strs = ["Action, Adventure, Drama", "Comedy, Drama, School"]
    """
    gens = [s.split(",") for s in gen_strs]
    gens = [x for l in gens for x in l] # flatten
    gens = [s.strip() for s in gens]
    
    gen_set = set(gens)
    count_occur = lambda gen, l: len([g for g in l if g == gen])
    gen_with_occur = [(gen, count_occur(gen, gens)) for gen in gen_set]
    gen_with_occur.sort(key=lambda x: x[1], reverse=True)
    
    # pick 3 most liked genres
    return [x[0] for x in gen_with_occur[:5]]

In [40]:
NUMBER_PRECISION = 2

feat_df = labeled_df \
    .withColumn('user_rating_cnt', count(lit(1)).over(windowSpec)) \
    .withColumn('user_rating_ave', mean(col('rating')).over(windowSpec)) \
    .withColumn('user_rating_ave', F.round(col('user_rating_ave'), NUMBER_PRECISION)) \
    .withColumn('user_rating_std', stddev(col('rating')).over(windowSpec)) \
    .withColumn('user_rating_std', F.round(col('user_rating_std'), NUMBER_PRECISION)) \
    .withColumn('user_aired_from_ave', mean(likedMoviesCol('aired_from')).over(windowSpec)) \
    .withColumn('user_aired_from_ave', F.round(col('user_aired_from_ave'), 0)) \
    .withColumn('user_aired_to_ave', mean(likedMoviesCol('aired_to')).over(windowSpec)) \
    .withColumn('user_aired_to_ave', F.round(col('user_aired_to_ave'), 0)) \
    .withColumn('user_liked_genres', most_liked_genres(collect_list(likedMoviesCol('genre')).over(windowSpec)))

In [41]:
feat_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)
 |-- label: integer (nullable = false)
 |-- user_rating_cnt: long (nullable = false)
 |-- user_rating_ave: double (nullable = true)
 |-- user_rating_std: double (nullable = true)
 |-- user_aired_from_ave: double (nullable = true)
 |-- user_aired_to_ave: double (nullable = true)
 |-- user_liked_genres: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [42]:
feat_df.select('anime_id', 'user_id', 'rating',
#                'user_rating_cnt', 'user_rating_ave', 'user_rating_std',
#                'user_aired_from_ave', 'user_aired_to_ave'
               'genre', 'user_liked_genres'
              ).head(10)

                                                                                

[Row(anime_id=1090, user_id=148, rating=6, genre='Action, Adventure, Drama, Mecha, Military, Sci-Fi, Space', user_liked_genres=[]),
 Row(anime_id=1091, user_id=148, rating=6, genre='Action, Drama, Mecha, Military, Sci-Fi, Space', user_liked_genres=[]),
 Row(anime_id=1092, user_id=148, rating=6, genre='Action, Adventure, Drama, Mecha, Military, Sci-Fi, Space', user_liked_genres=[]),
 Row(anime_id=813, user_id=148, rating=10, genre='Action, Adventure, Comedy, Fantasy, Martial Arts, Shounen, Super Power', user_liked_genres=[]),
 Row(anime_id=170, user_id=148, rating=9, genre='Comedy, Drama, School, Shounen, Sports', user_liked_genres=['Comedy', 'Adventure', 'Shounen', 'Action', 'Fantasy']),
 Row(anime_id=30, user_id=148, rating=6, genre='Action, Dementia, Drama, Mecha, Psychological, Sci-Fi', user_liked_genres=['Comedy', 'Shounen', 'Sports', 'Adventure', 'Drama']),
 Row(anime_id=81, user_id=148, rating=8, genre='Adventure, Drama, Mecha, Military, Romance, Sci-Fi', user_liked_genres=['Come

## Encoding
### 将数值型和分类型特征分布进行encode表达

### 1. Genres: multi-hot

In [43]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
import pyspark.sql.types as types
from pyspark.ml.linalg import SparseVector
import numpy as np

In [44]:
# 1. parse genre to list
@udf(returnType='array<string>')
def genre_to_list(gen_str):
    if gen_str is None:
        return []
    
    gens = gen_str.split(",")
    return [gen.strip() for gen in gens]

genres_df = feat_df.withColumn('genres', genre_to_list(col('genre'))).drop('genre')

Traceback (most recent call last):
  File "/Users/daniel/Programs/machine-learning/recsys-lecture/concrec/rank-service/venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/daniel/Programs/machine-learning/recsys-lecture/concrec/rank-service/venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/daniel/Programs/machine-learning/recsys-lecture/concrec/rank-service/venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/daniel/Programs/machine-learning/recsys-lecture/concrec/rank-service/venv/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [45]:
genres_df.head(5)

                                                                                

[Row(anime_id=1090, user_id=148, rating=6, name='Mobile Suit Gundam I', type='Movie', episodes='1', all_rating=7.43, members=12877, aired_from=353347200, aired_to=353347200, label=0, user_rating_cnt=0, user_rating_ave=None, user_rating_std=None, user_aired_from_ave=None, user_aired_to_ave=None, user_liked_genres=[], genres=['Action', 'Adventure', 'Drama', 'Mecha', 'Military', 'Sci-Fi', 'Space']),
 Row(anime_id=1091, user_id=148, rating=6, name='Mobile Suit Gundam II: Soldiers of Sorrow', type='Movie', episodes='1', all_rating=7.56, members=11325, aired_from=363628800, aired_to=363628800, label=0, user_rating_cnt=1, user_rating_ave=6.0, user_rating_std=None, user_aired_from_ave=None, user_aired_to_ave=None, user_liked_genres=[], genres=['Action', 'Drama', 'Mecha', 'Military', 'Sci-Fi', 'Space']),
 Row(anime_id=1092, user_id=148, rating=6, name='Mobile Suit Gundam III: Encounters in Space', type='Movie', episodes='1', all_rating=7.89, members=11343, aired_from=384796800, aired_to=3847968

In [46]:
feat_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)
 |-- label: integer (nullable = false)
 |-- user_rating_cnt: long (nullable = false)
 |-- user_rating_ave: double (nullable = true)
 |-- user_rating_std: double (nullable = true)
 |-- user_aired_from_ave: double (nullable = true)
 |-- user_aired_to_ave: double (nullable = true)
 |-- user_liked_genres: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [47]:
def encode_genres_col(index_mapping_broadcasted):
    @udf(returnType='array<int>')
    def encode_genres_col(genres, max_genre_index):
        """
        用已经训练好的string index mapping对genres数组进行encode
        """
        if genres is None:
            genres = []
        gen_vec = [index_mapping_broadcasted.value.get(gen) for gen in genres]
        gen_vec = list(set(gen_vec)) # dedup

        # convert genre vector to multi-hot
        fill = np.ones(len(gen_vec), dtype=np.int32)
        sorted_index = np.sort(gen_vec)
        multihot_vec = SparseVector(max_genre_index + 1, sorted_index, fill)
        return multihot_vec.toArray().astype(np.int32).tolist()
    return encode_genres_col
    

def multi_hot_encode_genres(featdf):
    df = featdf.withColumn('genre_item', explode(col('genres')))
    
    genre_string_indexer = StringIndexer(inputCol='genre_item', outputCol='genre_index')
    indexer_model = genre_string_indexer.fit(df)
    
    # get mapping from string indexer
    gens_df = spark.createDataFrame(
        [{'genre_item': g} for g in indexer_model.labels]
    )
    mapping_df = indexer_model.transform(gens_df).collect()
    mapping_dict = {row.genre_item: int(row.genre_index) for row in mapping_df}
    max_genre_index = __builtin__.max(mapping_dict.values())
    broadcasted = spark.sparkContext.broadcast(mapping_dict)
    
    encode_fn = encode_genres_col(broadcasted)
   
    return featdf \
        .withColumn( 'genres_multihot', encode_fn(col('genres'), lit(max_genre_index)) ) \
        .withColumn( 'user_liked_genres_multihot', encode_fn(col('user_liked_genres'), lit(max_genre_index)) )

In [48]:
genre_encoded_df = multi_hot_encode_genres(genres_df)

                                                                                

In [49]:
genre_encoded_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)
 |-- label: integer (nullable = false)
 |-- user_rating_cnt: long (nullable = false)
 |-- user_rating_ave: double (nullable = true)
 |-- user_rating_std: double (nullable = true)
 |-- user_aired_from_ave: double (nullable = true)
 |-- user_aired_to_ave: double (nullable = true)
 |-- user_liked_genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres_multihot: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_liked_genres_multihot: array (nu

In [50]:
# genre_encoded_df.head(5)
# genre_encoded_df.collect()

### 2.  min max scaler for numeric features

In [51]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

In [52]:
@udf(types.FloatType())
def extract_float(l):
    r = __builtin__.round(l[0], NUMBER_PRECISION)
        
    return float(r)


def min_max_scale(featdf, col):
    output_col = f"{col}_min_max"
    vec_assembler = VectorAssembler(inputCols=[col], outputCol=f"{col}_vec", handleInvalid='keep')
    min_max_scaler = MinMaxScaler(inputCol=f"{col}_vec", outputCol=output_col)
    pipeline = Pipeline(stages=[vec_assembler, min_max_scaler])
    
    return pipeline \
        .fit(featdf) \
        .transform(featdf) \
        .drop(f"{col}_vec") \
        .withColumn(output_col, extract_float(F.col(output_col)))

In [53]:
scaled_df = genre_encoded_df

In [54]:
scaled_df = min_max_scale(scaled_df, 'all_rating')
scaled_df = min_max_scale(scaled_df, 'members')
scaled_df = min_max_scale(scaled_df, 'aired_from')
scaled_df = min_max_scale(scaled_df, 'aired_to')
scaled_df = min_max_scale(scaled_df, 'user_rating_ave')
scaled_df = min_max_scale(scaled_df, 'user_rating_std')
scaled_df = min_max_scale(scaled_df, 'user_aired_from_ave')
scaled_df = min_max_scale(scaled_df, 'user_aired_to_ave')

                                                                                

In [55]:
scaled_df.select('anime_id', 'user_id', 
                 'user_aired_from_ave', 'user_aired_from_ave_min_max'
                ).show(1000)



+--------+-------+-------------------+---------------------------+
|anime_id|user_id|user_aired_from_ave|user_aired_from_ave_min_max|
+--------+-------+-------------------+---------------------------+
|    1090|    148|               null|                        NaN|
|    1091|    148|               null|                        NaN|
|    1092|    148|               null|                        NaN|
|     813|    148|               null|                        NaN|
|     170|    148|         6.095196E8|                       0.73|
|      30|    148|         6.801102E8|                       0.75|
|      81|    148|         6.801102E8|                       0.75|
|     225|    148|         7.275732E8|                       0.76|
|    2961|    148|         7.275732E8|                       0.76|
|     552|    148|         7.758423E8|                       0.78|
|    2397|    148|        8.0482104E8|                       0.79|
|    1313|    148|         8.293674E8|                        

                                                                                

### Pick useful features

In [56]:
scaled_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)
 |-- label: integer (nullable = false)
 |-- user_rating_cnt: long (nullable = false)
 |-- user_rating_ave: double (nullable = true)
 |-- user_rating_std: double (nullable = true)
 |-- user_aired_from_ave: double (nullable = true)
 |-- user_aired_to_ave: double (nullable = true)
 |-- user_liked_genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres_multihot: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_liked_genres_multihot: array (nu

In [57]:
output_df = scaled_df.select('anime_id', 'user_id', 'label', 
                             'all_rating_min_max', 'members_min_max', 
                             'aired_from_min_max', 'aired_to_min_max',
                             'genres_multihot',
                             'user_rating_ave_min_max', 'user_rating_std_min_max',
                             'user_aired_from_ave_min_max', 'user_aired_to_ave_min_max',
                             'user_liked_genres_multihot'
                            )

In [58]:
output_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- label: integer (nullable = false)
 |-- all_rating_min_max: float (nullable = true)
 |-- members_min_max: float (nullable = true)
 |-- aired_from_min_max: float (nullable = true)
 |-- aired_to_min_max: float (nullable = true)
 |-- genres_multihot: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- user_rating_ave_min_max: float (nullable = true)
 |-- user_rating_std_min_max: float (nullable = true)
 |-- user_aired_from_ave_min_max: float (nullable = true)
 |-- user_aired_to_ave_min_max: float (nullable = true)
 |-- user_liked_genres_multihot: array (nullable = true)
 |    |-- element: integer (containsNull = true)



### Output

In [60]:
output_df.fillna(0) \
    .write \
    .mode('overwrite') \
    .save('../../data/anime/dnn_feat_eng')
#     .format('csv').option("header", "true") \

                                                                                