In [49]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

![alt text](FeatEngImg/1.png "Title")
![alt text](FeatEngImg/2.png "Title")

## 聚合user id
![alt text](FeatEngImg/3.png "Title")

## 删除未来特征
![alt text](FeatEngImg/4.png "Title")

In [50]:
spark = (
    SparkSession.builder.appName("concrec-rank")
    .config("spark.driver.memory", "11g")
    .getOrCreate()
)

## Load Data


In [51]:
from pyspark.sql.types import IntegerType

anime_df = spark.read.csv(
    "../anime-data/parsed_anime.csv", header=True, inferSchema=True
)
anime_df = anime_df.withColumn("aired_from", col("aired_from").cast("int"))

# rename rating to all_rating
anime_df = anime_df.withColumnRenamed("rating", "all_rating")

anime_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)



In [52]:
rating_df = spark.read.csv("../anime-data/rating.csv", header=True, inferSchema=True)
rating_df = rating_df.filter(rating_df["rating"] > 0)
rating_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



                                                                                

In [53]:
like_threshold = 7.5

merged_df = rating_df.join(
    anime_df.select(
        "anime_id",
        "name",
        "genre",
        "type",
        "episodes",
        "all_rating",
        "members",
        "aired_from",
        "aired_to",
    ),
    on=["anime_id"],
    how="left",
).withColumn("label", when(col("rating") >= like_threshold, 1).otherwise(0))

## Feature Builder


## 1. numeric features: min-max


In [54]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline
import pyspark.sql.types as types
from pyspark.sql import functions as F

In [55]:
NUMBER_PRECISION = 2


@udf(types.FloatType())
def extract_float(l):
    r = __builtin__.round(l[0], NUMBER_PRECISION)

    return float(r)

In [56]:
class NumericScaler:
    def __init__(self, cols):
        self.cols = cols
        self.pipeline = self.__build_min_max_scalers(cols)

    def __build_min_max_scalers(self, cols):
        pipelines = [self.__build_one_min_max_scaler(col) for col in cols]
        return Pipeline(stages=pipelines)

    def __build_one_min_max_scaler(self, col):
        output_col = f"{col}_min_max"

        vec_assembler = VectorAssembler(
            inputCols=[col], outputCol=f"{col}_vec", handleInvalid="keep"
        )
        min_max_scaler = MinMaxScaler(inputCol=f"{col}_vec", outputCol=output_col)
        pipeline = Pipeline(stages=[vec_assembler, min_max_scaler])

        return pipeline

    def fit(self, df):
        self.model = self.pipeline.fit(df)

    def transform(self, df):
        result = self.model.transform(df)

        # drop all intermedia cols and convert output to float
        for col in self.cols:
            output_col = f"{col}_min_max"
            result = result.drop(f"{col}_vec").withColumn(
                output_col, extract_float(F.col(output_col))
            )

        return result

In [57]:
item_numeric_cols = ["all_rating", "members", "aired_from", "aired_to"]

item_numeric_scaler = NumericScaler(item_numeric_cols)

In [58]:
user_numeric_cols = [
    "user_rating_ave",
    "user_rating_std",
    "user_aired_from_ave",
    "user_aired_to_ave",
]

In [59]:
user_numeric_scaler = NumericScaler(user_numeric_cols)

3. categorical data - multihot


In [60]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
import pyspark.sql.types as types
from pyspark.ml.linalg import SparseVector
import numpy as np

In [61]:
# 用已经训练好的string index mapping对genres数组进行encode
def encode_genres_col(index_mapping_broadcasted):
    @udf(returnType="array<int>")
    def encode_genres_col(genres, max_genre_index):
        if genres is None:
            genres = []
        gen_vec = [index_mapping_broadcasted.value.get(gen) for gen in genres]
        gen_vec = list(set(gen_vec))  # dedup

        # convert genre vector to multi-hot
        fill = np.ones(len(gen_vec), dtype=np.int32)
        sorted_index = np.sort(gen_vec)
        multihot_vec = SparseVector(max_genre_index + 1, sorted_index, fill)
        return multihot_vec.toArray().astype(np.int32).tolist()

    return encode_genres_col

In [62]:
class CategoricalEncoder:
    def __init__(self, colname):
        self.colname = colname

    def fit(self, df):
        exploded_df = df.withColumn("genre_item", explode(col("genres")))

        genre_string_indexer = StringIndexer(
            inputCol="genre_item", outputCol="genre_index"
        )
        indexer_model = genre_string_indexer.fit(exploded_df)

        # get mapping from string indexer
        gens_df = spark.createDataFrame(
            [{"genre_item": g} for g in indexer_model.labels]
        )
        mapping_df = indexer_model.transform(gens_df).collect()
        mapping_dict = {row.genre_item: int(row.genre_index) for row in mapping_df}
        self.max_genre_index = __builtin__.max(mapping_dict.values())
        broadcasted = spark.sparkContext.broadcast(mapping_dict)

        self.encode_fn = encode_genres_col(broadcasted)

    def transform(self, df):
        return df.withColumn(
            f"{self.colname}_multihot",
            self.encode_fn(col(self.colname), lit(self.max_genre_index)),
        )

In [63]:
item_categorical_encoder = CategoricalEncoder("genres")

In [64]:
user_categorical_encoder = CategoricalEncoder("user_liked_genres")

## Build Dataset for DNN Training


In [65]:
from pyspark.sql.window import Window

In [66]:
# 帮助方法：对于某一列，在聚合的时候，如果用户不喜欢这个电影，则不聚合这个电影的信息
likedMoviesCol = lambda colname: when(col("label") == 1, col(colname)).otherwise(
    lit(None)
)


@udf(returnType="array<string>")
def genre_to_list(gen_str):
    if gen_str is None:
        return []

    gens = gen_str.split(",")
    return [gen.strip() for gen in gens]


# pick 5 most liked genres
@udf(types.ArrayType(types.StringType()))
def most_liked_genres(gen_strs):
    """
    gen_strs = ["Action, Adventure, Drama", "Comedy, Drama, School"]
    """
    gens = [s.split(",") for s in gen_strs]
    gens = [x for l in gens for x in l]  # flatten
    gens = [s.strip() for s in gens]

    gen_set = set(gens)
    count_occur = lambda gen, l: len([g for g in l if g == gen])
    gen_with_occur = [(gen, count_occur(gen, gens)) for gen in gen_set]
    gen_with_occur.sort(key=lambda x: x[1], reverse=True)

    return [x[0] for x in gen_with_occur[:5]]

In [67]:
# group by user_id
# sort by aired_from
# 处理 aggregate 特征，避免引入未来信息

windowSpec = Window \
    .partitionBy("user_id") \
    .orderBy("aired_from") \
      .rowsBetween(-100, -1)

# 前100行，到前1行

In [68]:
feat_df = (
    merged_df.withColumn("genres", genre_to_list(col("genre")))
    .withColumn("user_rating_cnt", count(lit(1)).over(windowSpec))
    .withColumn("user_rating_ave", mean(col("rating")).over(windowSpec))
    .withColumn("user_rating_ave", F.round(col("user_rating_ave"), NUMBER_PRECISION))
    .withColumn("user_rating_std", stddev(col("rating")).over(windowSpec))
    .withColumn("user_rating_std", F.round(col("user_rating_std"), NUMBER_PRECISION))
    .withColumn(
        "user_aired_from_ave", mean(likedMoviesCol("aired_from")).over(windowSpec)
    )
    .withColumn("user_aired_from_ave", F.round(col("user_aired_from_ave"), 0))
    .withColumn("user_aired_to_ave", mean(likedMoviesCol("aired_to")).over(windowSpec))
    .withColumn("user_aired_to_ave", F.round(col("user_aired_to_ave"), 0))
    .withColumn(
        "user_liked_genres",
        most_liked_genres(collect_list(likedMoviesCol("genre")).over(windowSpec)),
    )
)

In [69]:
feat_df.show(5, vertical=True)



-RECORD 0-----------------------------------
 anime_id            | 523                  
 user_id             | 31                   
 rating              | 7                    
 name                | Tonari no Totoro     
 genre               | Adventure, Comedy... 
 type                | Movie                
 episodes            | 1                    
 all_rating          | 8.48                 
 members             | 271484               
 aired_from          | 577123200            
 aired_to            | 577123200            
 label               | 0                    
 genres              | [Adventure, Comed... 
 user_rating_cnt     | 0                    
 user_rating_ave     | NULL                 
 user_rating_std     | NULL                 
 user_aired_from_ave | NULL                 
 user_aired_to_ave   | NULL                 
 user_liked_genres   | []                   
-RECORD 1-----------------------------------
 anime_id            | 15                   
 user_id  

                                                                                

## Train

In [70]:
item_numeric_scaler.fit(feat_df)

                                                                                

In [71]:
user_numeric_scaler.fit(feat_df)

ERROR:root:KeyboardInterrupt while sending command.               (0 + 12) / 13]
Traceback (most recent call last):
  File "/Users/jiaronghe/.pyenv/versions/3.11.3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jiaronghe/.pyenv/versions/3.11.3/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jiaronghe/.pyenv/versions/3.11.3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                

In [None]:
item_categorical_encoder.fit(feat_df)

                                                                                

In [None]:
user_categorical_encoder.fit(feat_df)

                                                                                

## Transform

## Transform data for DNN Training


In [None]:
transformed_df = item_numeric_scaler.transform(feat_df)
transformed_df = user_numeric_scaler.transform(transformed_df)

In [None]:
transformed_df.show(5, vertical=True)

24/04/25 20:11:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 65:>                                                         (0 + 1) / 1]

-RECORD 0-------------------------------------------
 anime_id                    | 523                  
 user_id                     | 31                   
 rating                      | 7                    
 name                        | Tonari no Totoro     
 genre                       | Adventure, Comedy... 
 type                        | Movie                
 episodes                    | 1                    
 all_rating                  | 8.48                 
 members                     | 271484               
 aired_from                  | 577123200            
 aired_to                    | 577123200            
 label                       | 0                    
 genres                      | [Adventure, Comed... 
 user_rating_cnt             | 0                    
 user_rating_ave             | NULL                 
 user_rating_std             | NULL                 
 user_aired_from_ave         | NULL                 
 user_aired_to_ave           | NULL           

                                                                                

In [None]:
transformed_df = item_categorical_encoder.transform(transformed_df)
transformed_df = user_categorical_encoder.transform(transformed_df)

## Transform data for Feature Serving

1. Item


In [None]:
anime_genres_df = anime_df \
  .withColumn("genres", genre_to_list(col("genre")))

In [None]:
# 线上使用的 item numerical & categorical

item_numeric_transformed_df = item_numeric_scaler.transform(anime_genres_df)

In [None]:
item_both_transformed_df = item_categorical_encoder.transform(
    item_numeric_transformed_df
)

## 2. User

2.1 build user feature df

之前构造的feat_df已经包含每个用户最新的喜好数据了，
只要选择feat_df中每个用户的最新数据即可，
这里可以同样按照aired_from排序，构造window进行筛选

In [None]:
w = Window.partitionBy("user_id")

user_feat_df = (
    feat_df.withColumn("max_aired", max("aired_from").over(w))
    .where(col("aired_from") == col("max_aired"))
    .drop("max_aired")
)

In [None]:
# 线上使用的 user numerical & categorical

user_numeric_transformed_df = user_numeric_scaler.transform(user_feat_df)

In [None]:
user_both_transformed_df = user_categorical_encoder.transform(
    user_numeric_transformed_df
)

## Save

1. Save transformed_df for DNN training

2. Save Item and User features to Redis
2.1 Collect both dataset


In [None]:
# item

item_features = item_both_transformed_df.collect()

                                                                                

In [None]:
item_features[1]

Row(anime_id=5114, name='Fullmetal Alchemist: Brotherhood', genre='Action, Adventure, Drama, Fantasy, Magic, Military, Shounen', type='TV', episodes='64', all_rating=9.26, members=793665, japanese_title='鋼の錬金術師 FULLMETAL ALCHEMIST', aired='Apr 5, 2009 to Jul 4, 2010', image_url='https://cdn.myanimelist.net/images/anime/1223/96541.jpg', aired_from=1238860800, aired_to=1278172800, genres=['Action', 'Adventure', 'Drama', 'Fantasy', 'Magic', 'Military', 'Shounen'], all_rating_min_max=0.9900000095367432, members_min_max=0.7799999713897705, aired_from_min_max=0.8999999761581421, aired_to_min_max=0.8999999761581421, genres_multihot=[0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [None]:
# user

user_features = user_both_transformed_df.collect()

                                                                                

In [None]:
user_features[1]

Row(anime_id=16001, user_id=53, rating=9, name='Kokoro Connect: Michi Random', genre='Comedy, Drama, Romance, School, Slice of Life, Supernatural', type='Special', episodes='4', all_rating=8.19, members=106989, aired_from=1353254400, aired_to=1355068800, label=1, genres=['Comedy', 'Drama', 'Romance', 'School', 'Slice of Life', 'Supernatural'], user_rating_cnt=39, user_rating_ave=7.62, user_rating_std=1.76, user_aired_from_ave=1238832000.0, user_aired_to_ave=1247838171.0, user_liked_genres=['Romance', 'School', 'Comedy', 'Drama', 'Slice of Life'], user_rating_ave_min_max=0.7400000095367432, user_rating_std_min_max=0.2800000011920929, user_aired_from_ave_min_max=0.9300000071525574, user_aired_to_ave_min_max=0.9300000071525574, user_liked_genres_multihot=[1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

2.2 Save to Redis


In [None]:
from redis import Redis

In [None]:
redis = Redis()

In [None]:
item_numeric_feature_prefix = "rank:item:num"
user_numeric_feature_prefix = "rank:user:num"
item_categorical_feature_prefix = "rank:item:cat"
user_categorical_feature_prefix = "rank:user:cat"

In [None]:
def save_numeric_features(rows, features, idcol, prefix):
    for row in rows:
        mapping = {feat: row[feat] for feat in features}
        key = f"{prefix}:{row[idcol]}"
        redis.hset(key, mapping=mapping)

In [None]:
item_num_features = [
    "all_rating_min_max",
    "members_min_max",
    "aired_from_min_max",
    "aired_to_min_max",
]

save_numeric_features(
    item_features,
    item_num_features,
    "anime_id",
    item_numeric_feature_prefix,
)

In [None]:
user_num_features = [
    "user_rating_ave_min_max",
    "user_rating_std_min_max",
    "user_aired_from_ave_min_max",
    "user_aired_to_ave_min_max",
]

save_numeric_features(
    user_features,
    user_num_features,
    "user_id",
    user_numeric_feature_prefix,
)

In [None]:
import json


def list2str(l):
    return json.dumps(l)


def save_categorical_features(rows, feature, idcol, prefix):
    for row in rows:
        mapping = {feature: list2str(row[feature])}
        key = f"{prefix}:{row[idcol]}"
        redis.hset(key, mapping=mapping)

In [None]:
save_categorical_features(
    item_features,
    "genres_multihot",
    "anime_id",
    item_categorical_feature_prefix,
)

In [None]:
save_categorical_features(
    user_features,
    "user_liked_genres_multihot",
    "user_id",
    user_categorical_feature_prefix,
)

In [None]:
spark.stop()