# Test for PySpark

In [10]:
# # ! head -n 20 ../../data/trainingSamples.csv
# # configure pyspark
# import os
# import sys
# from datetime import datetime, date
# import pandas as pd
# from pyspark.sql import Row


# os.environ['PYSPARK_DRIVER_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3"
# os.environ['PYSPARK_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3"

# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("Spark").getOrCreate()


# df = spark.createDataFrame([
#     Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
#     Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
#     Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
# ])
# df.show()

# 0. EDA

In [2]:
# configure pyspark
import os
import sys
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import functions as F



os.environ['PYSPARK_DRIVER_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3"
os.environ['PYSPARK_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3"
data_path = "../../data/"

# Spark configuration
conf = SparkConf().setAppName('featureEngineering').setMaster('local')
# Create Spark instance
spark = SparkSession.builder.config(conf=conf).getOrCreate()


In [3]:
## alternative way to load csv files
#df = spark.read.format('csv').option('header',True).load(data_path+"movies.csv")
movie_df = spark.read.csv(data_path+"movies.csv", header=True)
movie_df.printSchema()
movie_df.show(5)

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [4]:
link_df = spark.read.csv(data_path+"links.csv", header=True)
link_df.printSchema()
link_df.show(5)

root
 |-- movieId: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: string (nullable = true)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [5]:
rating_df = spark.read.csv(data_path+"ratings.csv", header=True)
rating_df.printSchema()
rating_df.show(5)

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows



In [6]:
total_rating_count = rating_df.count()#.groupBy('movieId').
# show percentage of each genre
rating_df.groupBy('movieId').count().withColumn("Percentage", F.round( F.col("count")/total_rating_count,6)).show(10)
rating_df.where(F.col('rating')>=3.5).orderBy('rating', ascending=True).show(10)

+-------+-----+----------+
|movieId|count|Percentage|
+-------+-----+----------+
|    296|14616|  0.012507|
|    467|  174|   1.49E-4|
|    829|  402|   3.44E-4|
|    691|  254|   2.17E-4|
|    675|    6|    5.0E-6|
|    125|  788|   6.74E-4|
|    800| 1609|  0.001377|
|    944|  259|   2.22E-4|
|    853|   20|    1.7E-5|
|    451|  159|   1.36E-4|
+-------+-----+----------+
only showing top 10 rows

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    337|   3.5|1094785709|
|    11|    344|   3.5|1230783700|
|     1|    367|   3.5|1112485980|
|     1|     29|   3.5|1112484676|
|     1|    589|   3.5|1112485557|
|     1|     47|   3.5|1112484727|
|     1|    593|   3.5|1112484661|
|     1|    112|   3.5|1094785740|
|     1|    919|   3.5|1094785621|
|    11|     19|   3.5|1230783704|
+------+-------+------+----------+
only showing top 10 rows



In [7]:
percent = rating_df.groupBy('movieId').count().withColumn("Percentage", F.round( F.col("count")/total_rating_count,6))
rating_df.join( percent, on=['movieId'], how='left').show()

+-------+------+------+----------+-----+----------+
|movieId|userId|rating| timestamp|count|Percentage|
+-------+------+------+----------+-----+----------+
|    296|     1|   4.0|1112484767|14616|  0.012507|
|    296|     8|   5.0| 833973081|14616|  0.012507|
|    296|    11|   3.5|1230858799|14616|  0.012507|
|    296|    13|   5.0| 849082366|14616|  0.012507|
|    296|    15|   3.0| 840206642|14616|  0.012507|
|    296|    18|   4.0|1195573677|14616|  0.012507|
|    296|    21|   5.0| 992188845|14616|  0.012507|
|    296|    22|   5.0| 994638043|14616|  0.012507|
|    296|    23|   5.0| 914457789|14616|  0.012507|
|    296|    24|   5.0| 994071115|14616|  0.012507|
|    296|    25|   4.0|1277962554|14616|  0.012507|
|    296|    26|   5.0| 839270563|14616|  0.012507|
|    296|    28|   5.0| 834092660|14616|  0.012507|
|    296|    29|   5.0| 835561519|14616|  0.012507|
|    296|    32|   5.0| 845962251|14616|  0.012507|
|    296|    34|   3.0| 839249781|14616|  0.012507|
|    296|   

In [42]:
movie_df.createTempView("movies")

In [76]:
exploded_genres = spark.sql("select movieId ,explode(split(genres, '\\\\|')) as movie_genre from movies;")
exploded_genres.show()
exploded_genres.createOrReplaceTempView("exploded_genres")


+-------+-----------+
|movieId|movie_genre|
+-------+-----------+
|      1|  Adventure|
|      1|  Animation|
|      1|   Children|
|      1|     Comedy|
|      1|    Fantasy|
|      2|  Adventure|
|      2|   Children|
|      2|    Fantasy|
|      3|     Comedy|
|      3|    Romance|
|      4|     Comedy|
|      4|      Drama|
|      4|    Romance|
|      5|     Comedy|
|      6|     Action|
|      6|      Crime|
|      6|   Thriller|
|      7|     Comedy|
|      7|    Romance|
|      8|  Adventure|
+-------+-----------+
only showing top 20 rows



## Check the genres amount of each movie and the total genre amount

Note: in regular expression, `\|` means the string should match `|` .But in python , Java programming language, we need to add one more `\` to interpret the `\` in `\|`. So we need `\\|` here.

In [35]:

tmp = movie_df.withColumn("splitted_genres",F.explode(F.split(movie_df['genres'], "\\|"))).drop("genres")
tmp.show(10)
tmp.groupBy(["movieId"]).agg(F.count(F.lit("splitted_genres")).alias("genres_count")).orderBy(F.col('genres_count'), ascending=False).show()


+-------+--------------------+---------------+
|movieId|               title|splitted_genres|
+-------+--------------------+---------------+
|      1|    Toy Story (1995)|      Adventure|
|      1|    Toy Story (1995)|      Animation|
|      1|    Toy Story (1995)|       Children|
|      1|    Toy Story (1995)|         Comedy|
|      1|    Toy Story (1995)|        Fantasy|
|      2|      Jumanji (1995)|      Adventure|
|      2|      Jumanji (1995)|       Children|
|      2|      Jumanji (1995)|        Fantasy|
|      3|Grumpier Old Men ...|         Comedy|
|      3|Grumpier Old Men ...|        Romance|
+-------+--------------------+---------------+
only showing top 10 rows

+-------+------------+
|movieId|genres_count|
+-------+------------+
|    364|           6|
|    198|           6|
|    673|           6|
|    459|           6|
|    595|           6|
|    631|           6|
|    546|           6|
|     20|           5|
|    709|           5|
|    783|           5|
|    594|        

Check the total amount of movie genres

In [37]:
tmp.select("splitted_genres").distinct().count()

19

Convert multiple genres to Multi-OneHot

In [41]:
tmp.groupBy("movieId").pivot("splitted_genres").count().fillna(0).show()

+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|movieId|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|    467|     0|        0|        0|       0|     1|    0|          0|    0|      0|        0|     0|   0|      0|      0|      0|     0|       0|  0|      0|
|    829|     0|        0|        0|       0|     1|    0|          0|    0|      1|        0|     0|   0|      1|      0|      0|     0|       0|  0|      0|
|    296|     0|        0|        0|       0|     1|    1|          0|    1|      0|        0|     0|   0|      0|      0|      0|     0|       1|  0|      0|
|    675|     1|        0|        0|       0| 

## 1. Feature Engineering
### 1.1 Raw Data
#### Moive
+ movieId
+ movie_genres (concated list)
#### User
+ userId
+ user_movie_rating
+ timestamp of rating

#### Other
+  Link.csv
    - movieId
    - imdbId of movie
    - tmdbId of movie

### 1.2 Possible Transformed Features

#### Moive Profile
- movieId
- statistic variables
    - percentage of each movie (count of that movie / total count)
    - Averge / median rating of each movie (if rating feature exist )
    - variance of rating of each movie (how variant the rating would be )
- multi-onehot genres or selected genres
- Release Year
- Other contents (text, image, audio, other tags)

####  User Profile
- userId
- statistic data: it tells the personality(how often the user changes his/her preference)
    - Percentage of the amount of movies each user watchs
    - Averge/ median rating of each user (it tells the preference of rating of this user)
    - Variance of rating each user gives 
    - AvergeReleaseYear: measure user's preference on release year of movie (some users like new movies while others like old movies)
    - Release Year count
    
- Behavior data
    - **The genres each user visits/likes most frequently** (we can choose top k): it tells user's daily hobbies
    - The genres each user visits/likes recently according to timestamp:  it tells how user's prefernce changes based on given genres
    - **The movies each user visits/likes recently**: it tells how user's prefernce changes recently based on given movies
    - rating of users on movies
    - Discretize rating to binary label: 1(like), 0 (dislike). If rating >3, like, otherwise, dislke
    
- Other contents (other tags,like membership, registration date, etc, texts/comments each user posts, images each user frequently visists, ..)

#### Other Context Features

+ visit time
+ others (such as website, country, location...)




# 1. Movie Features

In [8]:
from pyspark.sql.types import IntegerType, StringType

def addRatingLabel(samples):
    total_count = samples.count()
    percentage = samples.groupBy("movieId").count().withColumnRenamed("count","movie_cnt").withColumn("Percentage",F.col("movie_cnt")/total_count)
    samples = samples.join(percentage, on=['movieId'], how='left')
    samples = samples.withColumn("label", F.when(F.col("rating")>3., 1).otherwise(0))
    return samples


label_df = addRatingLabel(rating_df)
label_df.show()

+-------+------+------+----------+---------+-------------------+-----+
|movieId|userId|rating| timestamp|movie_cnt|         Percentage|label|
+-------+------+------+----------+---------+-------------------+-----+
|    296|     1|   4.0|1112484767|    14616|0.01250686696821428|    1|
|    296|     8|   5.0| 833973081|    14616|0.01250686696821428|    1|
|    296|    11|   3.5|1230858799|    14616|0.01250686696821428|    1|
|    296|    13|   5.0| 849082366|    14616|0.01250686696821428|    1|
|    296|    15|   3.0| 840206642|    14616|0.01250686696821428|    0|
|    296|    18|   4.0|1195573677|    14616|0.01250686696821428|    1|
|    296|    21|   5.0| 992188845|    14616|0.01250686696821428|    1|
|    296|    22|   5.0| 994638043|    14616|0.01250686696821428|    1|
|    296|    23|   5.0| 914457789|    14616|0.01250686696821428|    1|
|    296|    24|   5.0| 994071115|    14616|0.01250686696821428|    1|
|    296|    25|   4.0|1277962554|    14616|0.01250686696821428|    1|
|    2

In [12]:
from pyspark.sql.types import *

def extractReleaseYearUdf(title):
    # add realease year
    if not title or len(title.strip()) < 6:
        return 1990
    else:
        yearStr = title.strip()[-5:-1]
    return int(yearStr)


def addMovieFea(movie_fea, rating_fea,round_num=2, use_MultiOneHot = False):
    #first use regular expression to convert list of genres to a list
    # then use explode function to expand the list
    
    # convert movie feature to onehot if enabled
    genres_cnt = movie_fea.withColumn("splitted_genres",F.explode(F.split(F.col('genres'), "\\|"))).groupBy('movieId').count()
    genres_cnt = genres_cnt.withColumnRenamed("count", "genres_cnt")
    
    movie_fea = movie_fea.join(genres_cnt, on="movieId", how="left")
    
    if use_MultiOneHot: 
        tmp = movie_fea.withColumn("splitted_genres",F.explode(F.split(F.col('genres'), "\\|"))).drop("genres")
        multi_onehot = tmp.groupBy("movieId").pivot("splitted_genres").count().fillna(0)
        # rename columns
        for c in multi_onehot.columns:
            if 'movie' not in c:
                multi_onehot = multi_onehot.withColumnRenamed(c, "genres_"+c)
        #multi_onehot.show()
        samples = movie_fea.drop('genres').join(multi_onehot, on= "movieId", how= "left")
    else:
        samples = movie_fea.withColumn("movieGenre1",F.split(F.col('genres'),"\\|")[0])\
                            .withColumn("movieGenre2",F.split(F.col('genres'),"\\|")[1])\
                            .withColumn("movieGenre3",F.split(F.col('genres'),"\\|")[2])
        samples = movie_fea
        
    
    samples = rating_fea.join(samples, on=['movieId'], how='left')
    # add releaseYear,title
    samples = samples.withColumn('releaseYear',
                                                       F.udf(extractReleaseYearUdf, IntegerType())('title')) \
        .withColumn('title', F.udf(lambda x: x.strip()[:-6].strip(), StringType())('title')) \
        .drop('title')
    
    
        
    # compute statistic for each movie: count, avg rating, std rating
    movie_stat = rating_fea.groupBy("movieId").agg(F.count(F.lit(1)).alias("movieRatingCount"), 
                                              F.format_number(F.avg(F.col("rating")), round_num).alias("movieAvgRating"), 
                                              F.format_number(F.stddev(F.col("rating")), round_num).alias("movieStdRating") ).fillna(0.)
    movie_fea = samples.join(movie_stat, on=["movieId"], how="left")
    
    return movie_fea
    
    
movie_fea = addMovieFea(movie_df, label_df,round_num=2)
movie_fea.show(10)
    
# addMovieFea(movie_df, label_df,round_num=2, use_MultiOneHot=False)

+-------+------+------+----------+---------+-------------------+-----+--------------------+----------+-----------+----------------+--------------+--------------+
|movieId|userId|rating| timestamp|movie_cnt|         Percentage|label|              genres|genres_cnt|releaseYear|movieRatingCount|movieAvgRating|movieStdRating|
+-------+------+------+----------+---------+-------------------+-----+--------------------+----------+-----------+----------------+--------------+--------------+
|    296|     1|   4.0|1112484767|    14616|0.01250686696821428|    1|Comedy|Crime|Dram...|         4|       1994|           14616|          4.17|          0.98|
|    296|     8|   5.0| 833973081|    14616|0.01250686696821428|    1|Comedy|Crime|Dram...|         4|       1994|           14616|          4.17|          0.98|
|    296|    11|   3.5|1230858799|    14616|0.01250686696821428|    1|Comedy|Crime|Dram...|         4|       1994|           14616|          4.17|          0.98|
|    296|    13|   5.0| 8490

### Note for using PySpark
+ function().over(sql.Window.partitionBy(...).orderBy(..).rowBetween(...)): 用于新增window function的feature column


# 2. User Features
+ add user profile features here

In [128]:
from pyspark.sql.functions import *
from pyspark import sql
from collections import defaultdict

NUMBER_PRECISION= 2

def extractSortedGenres(genres_list):
    """
    input: a list of concatenated genres string like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
    output: a list of genres sorted by frequency of genre ['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    example:
        if we have a list of (genre, frequency) ,like (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
        then we sort it in descending order and return ['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    """
    genre_ls = defaultdict(int) 
    for genres in  genres_list:
        for genre in genres.split('|'):
            genre_ls[genre] += 1
    # genre_ls.item() = (key=genre, value=count)        
    # return sorted list, not dictionary!
    sorted_genres = sorted(genre_ls.items(), key=lambda x:x[1], reverse=True )
    # return list of genre
    return [ g[0] for g in sorted_genres]
    
    


def addUserFea(samplesWithMovieFea, round_number = 2):
    """
    input:
        samplesWithMovieFea: Spark DataFrame with movie features
        round_num: precision number 
    output:
        dataframe with extracted user features
    """
    # extract behavior features
    extractSortedGenres_udf = F.udf(extractSortedGenres, ArrayType(StringType()))
    # add user statistic: Rating count, AverageRating, Rating Stddev,  AverageReleaseYear, ReleaseYearStddev
    # use window function to add new feature column and each user has the same value in this column
    # the first line equivalent to   select count() over (partition by userId, order by timestemp)
    
    #    samplesWithUserFea.filter(samplesWithMovieFea['userId'] == 1).orderBy(F.col('timestamp').asc()).show(truncate=False)
    #   samplesWithUserFea.where(F.col("userId") == 2).show()
    
    
    #  Behavior data:
    #  The genres each user visits/likes most frequently (we can choose top k): it tells user's daily hobbies
    #  The genres each user visits/likes recently according to timestamp: it tells how user's prefernce changes based on given genres
    #  The movies each user visits/likes recently:
    
    
    samples = samplesWithMovieFea.withColumn("userRatingCnt", F.count(F.lit(1))\
                                             .over(sql.Window.partitionBy('userId')\
                                                   .orderBy('timestamp').rowsBetween(-100,-1))) \
                                 .withColumn("userAvgRating", format_number(F.avg("rating")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number))\
                                 .withColumn("userRatingStddev", format_number(F.stddev("rating")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number)) \
                                 .withColumn("userReleaseYearStddev", format_number(F.stddev("releaseYear")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number)) \
                                 .withColumn("userAvgReleaseYear", format_number(F.avg("releaseYear")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number).cast(IntegerType()))\
                                 .withColumn("userActiveMovies", F.collect_list(when(F.col("label")==1, F.col("movieId")).otherwise(F.lit(None)))\
                                             .over(sql.Window.partitionBy("userId") \
                                             .orderBy("timestamp").rowsBetween(-100,-1)))\
                                 .withColumn("userRatedMovie1", F.col("userActiveMovies")[0])\
                                 .withColumn("userRatedMovie2", F.col("userActiveMovies")[1])\
                                 .withColumn("userRatedMovie3", F.col("userActiveMovies")[2])\
                                 .withColumn("userRatedMovie4", F.col("userActiveMovies")[3])\
                                 .withColumn("userRatedMovie5", F.col("userActiveMovies")[4])\
                                 .withColumn("userGenres", extractSortedGenres_udf(F.collect_list(when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None)))\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1))))\
                                 .withColumn("userGenre1", F.col("userGenres")[0])\
                                 .withColumn("userGenre2", F.col("userGenres")[1])\
                                 .withColumn("userGenre3", F.col("userGenres")[2])\
                                 .withColumn("userGenre4", F.col("userGenres")[3])\
                                 .withColumn("userGenre5", F.col("userGenres")[4])\
                                 .drop("userActiveMovies","userGenres","genres")\
                                 .filter(F.col("userRatingCnt")>1) # remove the  users who watch movies once or even don't watch movie
    samples.printSchema()
    samples.show(5)

    return samples


# def extractGenres(genres_list):
#     '''
#     pass in a list which format like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
#     count by each genre，return genre_list in reverse order
#     eg:
#     (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
#     return:['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
#     '''
#     genres_dict = defaultdict(int)
#     for genres in genres_list:
#         for genre in genres.split('|'):
#             genres_dict[genre] += 1
#     sortedGenres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
#     return [x[0] for x in sortedGenres]


# def addUserFeatures(samplesWithMovieFeatures):
#     extractGenresUdf = F.udf(extractGenres, ArrayType(StringType()))
#     samplesWithUserFeatures = samplesWithMovieFeatures \
#         .withColumn('userPositiveHistory',
#                     F.collect_list(when(F.col('label') == 1, F.col('movieId')).otherwise(F.lit(None))).over(
#                         sql.Window.partitionBy("userId").orderBy(F.col("timestamp")).rowsBetween(-100, -1))) \
#         .withColumn("userPositiveHistory", reverse(F.col("userPositiveHistory"))) \
#         .withColumn('userRatedMovie1', F.col('userPositiveHistory')[0]) \
#         .withColumn('userRatedMovie2', F.col('userPositiveHistory')[1]) \
#         .withColumn('userRatedMovie3', F.col('userPositiveHistory')[2]) \
#         .withColumn('userRatedMovie4', F.col('userPositiveHistory')[3]) \
#         .withColumn('userRatedMovie5', F.col('userPositiveHistory')[4]) \
#         .withColumn('userRatingCount',
#                     F.count(F.lit(1)).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
#         .withColumn('userAvgReleaseYear', F.avg(F.col('releaseYear')).over(
#         sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)).cast(IntegerType())) \
#         .withColumn('userReleaseYearStddev', F.stddev(F.col("releaseYear")).over(
#         sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
#         .withColumn("userAvgRating", format_number(
#         F.avg(F.col("rating")).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)),
#         NUMBER_PRECISION)) \
#         .withColumn("userRatingStddev", F.stddev(F.col("rating")).over(
#         sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))) \
#         .withColumn("userGenres", extractGenresUdf(
#         F.collect_list(when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None))).over(
#             sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)))) \
#         .withColumn("userRatingStddev", format_number(F.col("userRatingStddev"), NUMBER_PRECISION)) \
#         .withColumn("userReleaseYearStddev", format_number(F.col("userReleaseYearStddev"), NUMBER_PRECISION)) \
#         .withColumn("userGenre1", F.col("userGenres")[0]) \
#         .withColumn("userGenre2", F.col("userGenres")[1]) \
#         .withColumn("userGenre3", F.col("userGenres")[2]) \
#         .withColumn("userGenre4", F.col("userGenres")[3]) \
#         .withColumn("userGenre5", F.col("userGenres")[4]) \
#         .drop("genres", "userGenres", "userPositiveHistory") \
#         .filter(F.col("userRatingCount") > 1)
#     samplesWithUserFeatures.printSchema()
#     samplesWithUserFeatures.show(10)
#     samplesWithUserFeatures.filter(samplesWithMovieFeatures['userId'] == 1).orderBy(F.col('timestamp').asc()).show(
#         truncate=False)
#     return samplesWithUserFeatures


In [129]:
addUserFea(movie_fea).show()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- movie_cnt: long (nullable = true)
 |-- Percentage: double (nullable = true)
 |-- label: integer (nullable = false)
 |-- genres_cnt: long (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieStdRating: string (nullable = true)
 |-- userRatingCnt: long (nullable = false)
 |-- userAvgRating: string (nullable = true)
 |-- userRatingStddev: string (nullable = true)
 |-- userReleaseYearStddev: string (nullable = true)
 |-- userAvgReleaseYear: integer (nullable = true)
 |-- userRatedMovie1: string (nullable = true)
 |-- userRatedMovie2: string (nullable = true)
 |-- userRatedMovie3: string (nullable = true)
 |-- userRatedMovie4: string (nullable = true)
 |-- userRatedMovie5: string (nullable = true)
 |-- userGenre1: stri

In [138]:
# addUserFeatures(movie_fea).show()
samples = addUserFea(movie_fea)
samples.sample(0.1).count()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- movie_cnt: long (nullable = true)
 |-- Percentage: double (nullable = true)
 |-- label: integer (nullable = false)
 |-- genres_cnt: long (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieStdRating: string (nullable = true)
 |-- userRatingCnt: long (nullable = false)
 |-- userAvgRating: string (nullable = true)
 |-- userRatingStddev: string (nullable = true)
 |-- userReleaseYearStddev: string (nullable = true)
 |-- userAvgReleaseYear: integer (nullable = true)
 |-- userRatedMovie1: string (nullable = true)
 |-- userRatedMovie2: string (nullable = true)
 |-- userRatedMovie3: string (nullable = true)
 |-- userRatedMovie4: string (nullable = true)
 |-- userRatedMovie5: string (nullable = true)
 |-- userGenre1: stri

111001

In [170]:
# df = samples.sample(0.1).withColumn("timestampLong", F.col("timestamp").cast(LongType()))
# quantile = df.stat.approxQuantile("timestampLong", [0.8], 0.05)
# quantile.show()

In [171]:
# help(df.stat.approxQuantile)
# df.repartition(5).show()
# df.repartition(1).sample(0.0001).write.csv("../../data/processed_data/samples",header=True, mode="overwrite")
# df.repartition(1).sample(0.0001).write.csv("../../data/samples.csv",mode='overwrite')
# ! rm -rf ../../data/processed_data/samples.csv

# 3. Concatenate features and Get training/testing Samples

In [None]:
def SampleTrainTestData(samples, sample_rate=0.1, save_path ="../../data/processed_data/"):
    """
    This function is to sample a small amount of samples from the huge dataset,
    then it splits the sampled dataset according timestamp
    For example, in a range of timestamp from 1second to 10000 second,80% samples are before 1000sec 
    and 20% samples are after 1000sec, we taks those 80% samples as training set and 20% samples as test set.
    This simulates the real world setting: we use data collected before a date and use data collected after this date
    to test the model.
    
    """
    samples = samples.sample(sample_rate).withColumn("timestamplong", F.col("timestamp").cast(LongType()))
    # approximate 80% quantile with 0.05 tolerance
    quantile = samples.stat.approxQuantile("timestamplong", [0.8], 0.05)
    timestamp_boundary = quantile[0]
    training_samples = samples.where(F.col("timestamplong")<=timestamp_boundary).drop("timestamplong")
    test_samples = samples.where(F.col("timestamplong")>timestamp_boundary).drop("timestamplong")
    train_file = save_path + "train.csv"
    test_file = save_path + "test.csv"
    
    # save files
    # repartition(1) is to amke all saved samples in the same csv file
    training_samples.repartition(1).write.option("header","true").mode("overwrite").csv(train_file)
    test_samples.repartition(1).write.option("header","true").mode("overwrite").csv(test_file)
    ## or equivalently
    # training_samples.write.csv(train_file,header=True, mode="overwrite")
    # test_samples.write.csv(test_file,header=True, mode="overwrite")
    return

In [172]:
train_df = spark.read.csv("../../data/processed_data/train.csv",header=True)
train_df.show()

+-------+------+------+----------+---------+--------------------+-----+----------+-----------+----------------+--------------+--------------+-------------+-------------+----------------+---------------------+------------------+---------------+---------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+
|movieId|userId|rating| timestamp|movie_cnt|          Percentage|label|genres_cnt|releaseYear|movieRatingCount|movieAvgRating|movieStdRating|userRatingCnt|userAvgRating|userRatingStddev|userReleaseYearStddev|userAvgReleaseYear|userRatedMovie1|userRatedMovie2|userRatedMovie3|userRatedMovie4|userRatedMovie5|userGenre1|userGenre2|userGenre3|userGenre4|userGenre5|
+-------+------+------+----------+---------+--------------------+-----+----------+-----------+----------------+--------------+--------------+-------------+-------------+----------------+---------------------+------------------+---------------+---------------+---------------

# 4. Other Feature Engineering Operation in Spark

In [66]:
%%writefile FeatureEng_util.py

from pyspark import SparkConf, SparkContext

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, QuantileDiscretizer, MinMaxScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *


def OneHotEncoding(samplesWithMovieFea):
    """
    convert movieId to OneHot representation
    
    Note: 
        1. input column to onehot must be numeric type
        2. onehot output column format:  (number of onehot columns, [one_hot position], [value]), or (number of onehot columns, {position:value})
    """
    # convert movieId to int type
    samplesWithMovieFea = samplesWithMovieFea.withColumn("movieId", F.col("movieId").cast(IntegerType()))
    enc = OneHotEncoder(inputCol="movieId", outputCol="movieIdVector",dropLast=False)
    samplesWithMovieFea = enc.fit(samplesWithMovieFea).transform(samplesWithMovieFea)
    samplesWithMovieFea.show(5)
    
    return samplesWithMovieFea

def Array2Vec(genreIndices, indexSize):
    """
    Convert Array type feature to a vector
    input:
        input are the features in one row. These features include:
            genreIndices: a list of numeric indice of genres
            indexSize: size of index
    output: 
        converted multi-onehot array for each row in table
    """
    genreIndices.sort()
    # a list of values to fill into muulti-one-hot array
    fill_list = [1.0 for i in range(len(genreIndices)) ]
    
    # sparse take  size of vector, positions in vector to fill, values used to fill these position
    return Vectors.sparse(indexSize, genreIndices, fill_list)


def MultiOneHotEncoding(samplesWithMovieFea):
    """
    convert genres of each movie to multi-onehot represenatation
    Step:
        1. first explode the genres for each movie into multiple rows
        2. use StringIndexer to convert genres into genreIndex in numerical label format
        3. obtain size of index
        4. aggregate the genreIndex list to get genreIndex array list for each movie 
        5. Convert genreIndex array list into multi-onehot
    """
    # split genre list for each movie and convert string to string list. Then explode each genre into one row
    samplesWithMovieFea = samplesWithMovieFea.withColumn("genre", F.explode(F.split("genres","\\|").cast(ArrayType(StringType()))))
    # Convert string list to numerical index label list in each row
    indexer = StringIndexer(inputCol="genre",outputCol= "genreIndex")
    indexModel= indexer.fit(samplesWithMovieFea)
    samplesWithMovieFea = indexModel.transform(samplesWithMovieFea)
    print("String labels: ")
    print(indexModel.labels)
    #indexer.save("StringIndices.csv")
    # obtain size of index and add indexSize to a new column
    indexSize = samplesWithMovieFea.agg(F.max("genreIndex")).head()[0] + 1
    
    samplesWithMovieFea = samplesWithMovieFea.groupBy("movieId").agg(F.collect_list("genreIndex").alias("genreIndices"))
    samplesWithMovieFea = samplesWithMovieFea.withColumn("IndexSize", F.lit(indexSize))
    
    # Convert index list in each row into multi-onehot vector
    Array2Vec_UDF = udf(Array2Vec, VectorUDT())
    samplesWithMovieFea = samplesWithMovieFea.withColumn("MultiOneHot_Vector", Array2Vec_UDF(F.col("genreIndices"), F.col("IndexSize")) ).orderBy("movieId")
    
    
    return samplesWithMovieFea


def MultiOneHot_v2(sampleWithMovieFea):
    """
    This function convert genre array list to Multi-OneHot using Pivot function
    Use Pivot and array method to convert genres to multi-onehot, without using StringIndexer
    """
    def arr2vec(arr):
        indexSize = len(arr)
        pos = [i for i in range(len(arr)) if arr[i]==1 ]
        fill_ls = [1]*len(pos)
        return Vectors.sparse(indexSize, pos, fill_ls)
    
    arr2vec_udf = udf(arr2vec, VectorUDT())
    tmp = sampleWithMovieFea.withColumn("splitted_genres",F.explode(F.split(F.col('genres'), "\\|"))).drop("genres")
    genre_ls = [s.splitted_genres for s in  tmp.select("splitted_genres").distinct().collect()]
    genre_ls.sort()
    print(genre_ls )
    
    multi_onehot = tmp.groupBy("movieId").pivot("splitted_genres").count().fillna(0)
    ##rename columns
    #for c in multi_onehot.columns:
    #    if 'movie' not in c:
    #        multi_onehot = multi_onehot.withColumnRenamed(c, "genres_"+c)
    #multi_onehot.show()
    
    columns = [F.col(c) for c in genre_ls]
    multi_onehot = multi_onehot.withColumn("Genres_Vector",arr2vec_udf(F.array(genre_ls))).drop(*genre_ls)
    samples = sampleWithMovieFea.drop('genres').join(multi_onehot, on= "movieId", how= "left").orderBy("movieId")
    return samples

def ratingDiscretizer(samplesWithRating):
    """
    This function adds statistic features of rating and
    discretize the rating feature using Binning. Then it normalize rating feature
    using MinMaxScalar
    """
    samplesWithRating.printSchema()
    # compute statistic for each movie
    samplesWithRating  = samplesWithRating.groupBy("movieId").agg(F.avg("rating").alias("AvgRating"),
                                                                  F.variance("rating").alias("RatingVar"),
                                                                 F.count(F.lit(1)).alias("ratingCnt"))
    
    # we need to convert average rating value to a dense Vector with User Define Type (UDT)
    # udf(lambda x: Vectors.dense(x), VectorUDT()):  take  dense vector as input,  VectorUDT() type vector as output
    samplesWithRating = samplesWithRating.withColumn('avgRatingVec', udf(lambda x: Vectors.dense(x), VectorUDT())('avgRating'))
    print()
    print("Dense Vector")
    samplesWithRating.show(5)
    
    #Bucket and discretize the rating
    ratingDiscretizer = QuantileDiscretizer(inputCol="ratingCnt", outputCol="ratingCntBucket", numBuckets= 100)
    
    # MinMaxScaler
    scaler = MinMaxScaler(inputCol="avgRatingVec", outputCol="ScaledAvgRating")
    pipe = Pipeline(stages = [ratingDiscretizer, scaler])
    TransformedSamples = pipe.fit(samplesWithRating).transform(samplesWithRating)
    TransformedSamples.show(5)
    
    return TransformedSamples

def test(data_path):
    # Spark configuration
    conf = SparkConf().setAppName('featureEngineering').setMaster('local')
    # Create Spark instance
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    movie_df = spark.read.csv(data_path+"movies.csv", header=True)
    movie_df = OneHotEncoding(movie_df)
#     movie_df = MultiOneHot_v2(movie_df)
    movie_df = MultiOneHotEncoding(movie_df)
    movie_df.printSchema()
    movie_df.show(5)
    
    
    #movie_df.show(5)
    link_df = spark.read.csv(data_path+"links.csv", header=True)
    link_df.printSchema()
    #link_df.show(5)

    rating_df = spark.read.csv(data_path+"ratings.csv", header=True)
    rating_df.printSchema()
    #rating_df.show(5)
    rating_df = ratingDiscretizer(rating_df)
    rating_df.printSchema()


if __name__ == "__main__":
#     os.environ['PYSPARK_DRIVER_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3" # path to python exec file
#     os.environ['PYSPARK_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3" #path to python exec file
    data_path = "../../data/"
    test(data_path)
    
    
    

Overwriting FeatureEng_util.py


# 5. Write Source code to Python

In [67]:
%%writefile FeatureEngineering.py

# configure pyspark
import os
import sys
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import functions as F

from pyspark.sql.types import *
from pyspark.sql.types import IntegerType, StringType

from pyspark.sql.functions import *
from pyspark import sql
from collections import defaultdict

def addRatingLabel(samples):
    total_count = samples.count()
    percentage = samples.groupBy("movieId").count().withColumnRenamed("count","movie_cnt").withColumn("Percentage",F.col("movie_cnt")/total_count)
    samples = samples.join(percentage, on=['movieId'], how='left')
    samples = samples.withColumn("label", F.when(F.col("rating")>3., 1).otherwise(0))
    return samples



def extractReleaseYearUdf(title):
    # add realease year
    if not title or len(title.strip()) < 6:
        return 1990
    else:
        yearStr = title.strip()[-5:-1]
    return int(yearStr)


def addMovieFea(movie_fea, rating_fea,round_num=2, use_MultiOneHot = False):
    #first use regular expression to convert list of genres to a list
    # then use explode function to expand the list
    
    # convert movie feature to onehot if enabled
    genres_cnt = movie_fea.withColumn("splitted_genres",F.explode(F.split(F.col('genres'), "\\|"))).groupBy('movieId').count()
    genres_cnt = genres_cnt.withColumnRenamed("count", "genres_cnt")
    
    movie_fea = movie_fea.join(genres_cnt, on="movieId", how="left")
    
    if use_MultiOneHot: 
        tmp = movie_fea.withColumn("splitted_genres",F.explode(F.split(F.col('genres'), "\\|"))).drop("genres")
        multi_onehot = tmp.groupBy("movieId").pivot("splitted_genres").count().fillna(0)
        # rename columns
        for c in multi_onehot.columns:
            if 'movie' not in c:
                multi_onehot = multi_onehot.withColumnRenamed(c, "genres_"+c)
        #multi_onehot.show()
        samples = movie_fea.drop('genres').join(multi_onehot, on= "movieId", how= "left")
    else:
        samples = movie_fea.withColumn("movieGenre1",F.split(F.col('genres'),"\\|")[0])\
                            .withColumn("movieGenre2",F.split(F.col('genres'),"\\|")[1])\
                            .withColumn("movieGenre3",F.split(F.col('genres'),"\\|")[2])
        samples = movie_fea
        
    
    samples = rating_fea.join(samples, on=['movieId'], how='left')
    # add releaseYear,title
    samples = samples.withColumn('releaseYear',
                                                       F.udf(extractReleaseYearUdf, IntegerType())('title')) \
        .withColumn('title', F.udf(lambda x: x.strip()[:-6].strip(), StringType())('title')) \
        .drop('title')
    
    
        
    # compute statistic for each movie: count, avg rating, std rating
    movie_stat = rating_fea.groupBy("movieId").agg(F.count(F.lit(1)).alias("movieRatingCount"), 
                                              F.format_number(F.avg(F.col("rating")), round_num).alias("movieAvgRating"), 
                                              F.format_number(F.stddev(F.col("rating")), round_num).alias("movieStdRating") ).fillna(0.)
    movie_fea = samples.join(movie_stat, on=["movieId"], how="left")
    
    return movie_fea
    
    





def extractSortedGenres(genres_list):
    """
    input: a list of concatenated genres string like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
    output: a list of genres sorted by frequency of genre ['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    example:
        if we have a list of (genre, frequency) ,like (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
        then we sort it in descending order and return ['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    """
    genre_ls = defaultdict(int) 
    for genres in  genres_list:
        for genre in genres.split('|'):
            genre_ls[genre] += 1
    # genre_ls.item() = (key=genre, value=count)        
    # return sorted list, not dictionary!
    sorted_genres = sorted(genre_ls.items(), key=lambda x:x[1], reverse=True )
    # return list of genre
    return [ g[0] for g in sorted_genres]
    
    


def addUserFea(samplesWithMovieFea, round_number = 2):
    """
    input:
        samplesWithMovieFea: Spark DataFrame with movie features
        round_num: precision number 
    output:
        dataframe with extracted user features
    """
    # extract behavior features
    extractSortedGenres_udf = F.udf(extractSortedGenres, ArrayType(StringType()))
    # add user statistic: Rating count, AverageRating, Rating Stddev,  AverageReleaseYear, ReleaseYearStddev
    # use window function to add new feature column and each user has the same value in this column
    # the first line equivalent to   select count() over (partition by userId, order by timestemp)
    
    #    samplesWithUserFea.filter(samplesWithMovieFea['userId'] == 1).orderBy(F.col('timestamp').asc()).show(truncate=False)
    #   samplesWithUserFea.where(F.col("userId") == 2).show()
    
    
    #  Behavior data:
    #  The genres each user visits/likes most frequently (we can choose top k): it tells user's daily hobbies
    #  The genres each user visits/likes recently according to timestamp: it tells how user's prefernce changes based on given genres
    #  The movies each user visits/likes recently:
    
    
    samples = samplesWithMovieFea.withColumn("userRatingCnt", F.count(F.lit(1))\
                                             .over(sql.Window.partitionBy('userId')\
                                                   .orderBy('timestamp').rowsBetween(-100,-1))) \
                                 .withColumn("userAvgRating", format_number(F.avg("rating")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number))\
                                 .withColumn("userRatingStddev", format_number(F.stddev("rating")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number)) \
                                 .withColumn("userReleaseYearStddev", format_number(F.stddev("releaseYear")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number)) \
                                 .withColumn("userAvgReleaseYear", format_number(F.avg("releaseYear")\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1)), round_number).cast(IntegerType()))\
                                 .withColumn("userActiveMovies", F.collect_list(when(F.col("label")==1, F.col("movieId")).otherwise(F.lit(None)))\
                                             .over(sql.Window.partitionBy("userId") \
                                             .orderBy("timestamp").rowsBetween(-100,-1)))\
                                 .withColumn("userRatedMovie1", F.col("userActiveMovies")[0])\
                                 .withColumn("userRatedMovie2", F.col("userActiveMovies")[1])\
                                 .withColumn("userRatedMovie3", F.col("userActiveMovies")[2])\
                                 .withColumn("userRatedMovie4", F.col("userActiveMovies")[3])\
                                 .withColumn("userRatedMovie5", F.col("userActiveMovies")[4])\
                                 .withColumn("userGenres", extractSortedGenres_udf(F.collect_list(when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None)))\
                                             .over(sql.Window.partitionBy("userId")\
                                                   .orderBy('timestamp').rowsBetween(-100,-1))))\
                                 .withColumn("userGenre1", F.col("userGenres")[0])\
                                 .withColumn("userGenre2", F.col("userGenres")[1])\
                                 .withColumn("userGenre3", F.col("userGenres")[2])\
                                 .withColumn("userGenre4", F.col("userGenres")[3])\
                                 .withColumn("userGenre5", F.col("userGenres")[4])\
                                 .drop("userActiveMovies","userGenres","genres")\
                                 .filter(F.col("userRatingCnt")>1) # remove the  users who watch movies once or even don't watch movie
    samples.printSchema()
    samples.show(5)

    return samples



def SampleTrainTestDataByTime(samples, sample_rate=0.1, save_path ="../../data/processed_data/"):
    """
    This function is to sample a small amount of samples from the huge dataset,
    then it splits the sampled dataset according timestamp
    For example, in a range of timestamp from 1second to 10000 second,80% samples are before 1000sec 
    and 20% samples are after 1000sec, we taks those 80% samples as training set and 20% samples as test set.
    This simulates the real world setting: we use data collected before a date and use data collected after this date
    to test the model.
    
    """
    samples = samples.sample(sample_rate).withColumn("timestamplong", F.col("timestamp").cast(LongType()))
    # approximate 80% quantile with 0.05 tolerance
    quantile = samples.stat.approxQuantile("timestamplong", [0.8], 0.05)
    timestamp_boundary = quantile[0]
    training_samples = samples.where(F.col("timestamplong")<=timestamp_boundary).drop("timestamplong")
    test_samples = samples.where(F.col("timestamplong")>timestamp_boundary).drop("timestamplong")
    train_file = save_path + "train.csv"
    test_file = save_path + "test.csv"
    
    # save files
    # repartition(1) is to amke all saved samples in the same csv file
    training_samples.repartition(1).write.option("header","true").mode("overwrite").csv(train_file)
    test_samples.repartition(1).write.option("header","true").mode("overwrite").csv(test_file)
    training_samples.toPandas().to_csv(test_file,header=True,index=False)
    test_samples.toPandas().to_csv(test_file,header=True,index=False)
    ## or equivalently
    # training_samples.write.csv(train_file,header=True, mode="overwrite")
    # test_samples.write.csv(test_file,header=True, mode="overwrite")
    return



if __name__=="__main__":
    os.environ['PYSPARK_DRIVER_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3" # path to python exec file
    os.environ['PYSPARK_PYTHON']="/home/wenkanw/.conda/envs/mlenv/bin/python3" #path to python exec file
    data_path = "../../data/"

    # Spark configuration
    conf = SparkConf().setAppName('featureEngineering').setMaster('local')
    # Create Spark instance
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    movie_df = spark.read.csv(data_path+"movies.csv", header=True)
    movie_df.printSchema()
    #movie_df.show(5)
    link_df = spark.read.csv(data_path+"links.csv", header=True)
    link_df.printSchema()
    #link_df.show(5)

    rating_df = spark.read.csv(data_path+"ratings.csv", header=True)
    rating_df.printSchema()
    #rating_df.show(5)

    label_df = addRatingLabel(rating_df)
    #label_df.show()
    movie_fea = addMovieFea(movie_df, label_df,round_num=2)
    movie_fea.show(10)
    transformed_samples = addUserFea(movie_fea)
    SampleTrainTestDataByTime(transformed_samples, sample_rate=0.1, save_path ="../../data/processed_data/")
    
    
    

Overwriting FeatureEngineering.py


In [168]:
# !python ./FeatureEngineering.py
!python3 --version

Python 3.8.5


# 5. Conclusion about some Tricks
+ Data we can explore from raw data
   - Atribute and Label Data       
       - Statistic data (count, average, Stddev, etc)
       - categorical labels (like tags)
       - numerical features
   - Behavior Data
       - 0/1 like or dislike
       - Rating
       - favourite
       - Sequential Data (sequence of item rated along time)
   - Relationship Data
       - Links among users and items in a graph 
       - Sequential Data (sequence of item rated along time), relationship among items visited by the same user
   - Content Data
       - Image
       - Text
       - Audio
       - Video
       - etc
+ **Usages of some functions**
   - Vector.sparse()
       - We either input a list of positions and a list of values to fill in vector, or a dictionary in format {position:value}
       - Vector.sparse(indexSize,positions_in_vector , values_to_fill_in_vector)
       - Vector.sparse(indexSize, {position: value})
   - Window function in Spark
       - `withColumn("..", Func(col(..)).over(sql.Window.partitionBy(col(...)).orderBy(col(..)).rowsBetween(..,..)) )` 其中 Func()可以是 sum(), count()等操作函数
   - Save Dataframe to file
       - `df.repartition(n).write.csv(file_name, header=True,mode='overwrite')`. 
       - .repartition(n): partition dataframe and save it into n csv files 
       - .over(...): it is equivalent to the 'over' keyword in SQL
   - **Spark UDF** 
       -  使用Spark UDF时， UDF函数的输入是table的某个column， 输入形式是一个row， 或者如果用了collect_list()函数作为输入，输入就是一个list of rows.输出是对应那个row的新column的值


       
+ **Debug Notes:**
   - 当PySpark Query很长时，尽量一句一句跑和加上去来debug，看一下有没有漏括号
   - `sorted(dictionary, key=..)`函数返回的是一个元素格式为[key， value]的list，
       