In [1]:
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import *
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *
from collections import defaultdict
from pyspark.sql import functions as F

spark = SparkSession.builder.\
        master('yarn').\
        config('spark.executor.memory', '4g').\
        config('spark.executor.cores', '4').\
        config('spark.driver.memory','4g').\
        config('spark.dynamicAllocation.minExecutors', '10').\
        config('spark.dynamicAllocation.maxExecutors', '50').\
        config('spark.executor.instances', 6).\
        config('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version', '2').\
        config('spark.sql.execution.arrow.enabled', 'false').\
        config('spark.driver.maxResultSize', '5g').\
        config('spark.sql.shuffle.partitions', '2000').\
        config('spark.default.parallelism', '50').\
        config('hive.exec.orc.split.strategy', 'ETL').\
        appName('movie_lens_feature').\
        enableHiveSupport().getOrCreate()

In [2]:
movie_path = 'file:////home/yeyuel/git-repo/SparrowRecSys/src/main/resources/webroot/sampledata/movies.csv'
rating_path = 'file:////home/yeyuel/git-repo/SparrowRecSys/src/main/resources/webroot/sampledata/ratings.csv'

In [3]:
csv_reader = spark.read.format('csv').option('header', 'true')
movie_samples = csv_reader.load(movie_path)
rating_samples = csv_reader.load(rating_path)

In [4]:
movie_samples.count(), rating_samples.count()

(982, 1168638)

In [5]:
type(movie_samples), type(rating_samples)

(pyspark.sql.dataframe.DataFrame, pyspark.sql.dataframe.DataFrame)

In [6]:
movie_samples.limit(5).toPandas()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [7]:
rating_samples.limit(5).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,1112486027
1,1,29,3.5,1112484676
2,1,32,3.5,1112484819
3,1,47,3.5,1112484727
4,1,50,3.5,1112484580


In [8]:
def add_sample_label(rating_samples: DataFrame):
#     rating_samples.show(5, truncate=False)
    rating_samples.printSchema()
    sample_count = rating_samples.count()
    rating_samples.groupBy('rating').\
    count().orderBy('rating').\
    withColumn('percentage', F.col('count') / sample_count).show()
    rating_samples = rating_samples.withColumn('label', when(F.col('rating') >= 3.5, 1).otherwise(0))
    return rating_samples

rating_samples_with_label = add_sample_label(rating_samples)

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+------+--------------------+
|rating| count|          percentage|
+------+------+--------------------+
|   0.5|  9788|0.008375561978987506|
|   1.0| 45018| 0.03852176636392108|
|   1.5| 11794|0.010092090108314123|
|   2.0| 87084| 0.07451751526135553|
|   2.5| 34269|0.029323879593167432|
|   3.0|323616| 0.27691723185451783|
|   3.5| 74376| 0.06364331811904114|
|   4.0|324804|  0.2779337998593234|
|   4.5| 53388| 0.04568395003414231|
|   5.0|204501| 0.17499088682722966|
+------+------+--------------------+



In [9]:
rating_samples_with_label.where(F.expr("rating >= 3.5")).limit(5).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp,label
0,1,2,3.5,1112486027,1
1,1,29,3.5,1112484676,1
2,1,32,3.5,1112484819,1
3,1,47,3.5,1112484727,1
4,1,50,3.5,1112484580,1


In [10]:
rating_samples_with_label.where(F.expr("rating < 3.5")).limit(5).toPandas()

Unnamed: 0,userId,movieId,rating,timestamp,label
0,1,653,3.0,1094785691,0
1,2,242,3.0,974820776,0
2,2,469,3.0,974820598,0
3,2,891,2.0,974820969,0
4,3,24,3.0,945176048,0


In [13]:
NUMBER_PRECISION = 2

def extract_release_year(title):
    if not title or len(title.strip()) < 6:
        return 1990
    else:
        year_str = title.strip()[-5: -1]
    return int(year_str)
    

def add_movie_features(movie_samples:DataFrame, rating_samples_with_label):

    
    samples_with_movies1 = rating_samples_with_label.join(movie_samples, on=['movieId'], how='left')
    samples_with_movies2 = samples_with_movies1.withColumn('releaseYear',
                                                          udf(extract_release_year, IntegerType())('title')) .\
    withColumn('title', udf(lambda x: x.strip()[: -6].strip(), StringType())('title')) .\
    drop('title')
    samples_with_movies3 = samples_with_movies2.withColumn('movieGenre1', split(F.col('genres'), '\\|')[0]) .\
    withColumn('movieGenre2', split(F.col('genres'), '\\|')[1]).\
    withColumn('movieGenre3', split(F.col('genres'), '\\|')[2])
    
    movies_rating_features = samples_with_movies3.groupBy('movieId').agg(F.count(F.lit(1)).alias('movieRatingCount'),
                                                                        F.format_number(F.avg(F.col('rating')), NUMBER_PRECISION)
                                                                        .alias('movieAvgRating'),
                                                                        F.stddev(F.col('rating')).alias('movieRatingStddev')).\
    fillna(0).\
    withColumn('movieRatingStddev', F.format_number(F.col('movieRatingStddev'), NUMBER_PRECISION))
    samples_with_movies4 = samples_with_movies3.join(movies_rating_features, on=['movieId'], how='left')
    samples_with_movies4.printSchema()
    return samples_with_movies4


samples_with_movie_features = add_movie_features(movie_samples, rating_samples_with_label)
samples_with_movie_features.limit(5).toPandas()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- genres: string (nullable = true)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)



Unnamed: 0,movieId,userId,rating,timestamp,label,genres,releaseYear,movieGenre1,movieGenre2,movieGenre3,movieRatingCount,movieAvgRating,movieRatingStddev
0,47,1,3.5,1112484727,1,Mystery|Thriller,1995,Mystery,Thriller,,9335,4.06,0.87
1,32,1,3.5,1112484819,1,Mystery|Sci-Fi|Thriller,1995,Mystery,Sci-Fi,Thriller,9694,3.89,0.86
2,50,1,3.5,1112484580,1,Crime|Mystery|Thriller,1995,Crime,Mystery,Thriller,10221,4.35,0.75
3,29,1,3.5,1112484676,1,Adventure|Drama|Fantasy|Mystery|Sci-Fi,1995,Adventure,Drama,Fantasy,1859,3.92,0.97
4,2,1,3.5,1112486027,1,Adventure|Children|Fantasy,1995,Adventure,Children,Fantasy,4853,3.21,0.96


In [27]:
from collections import defaultdict
def extract_genres(genres_list):
    """
    Pass in a list which format like ["Action|Adventure|Sci-Fi|Thriller", "Crime|Horror|Thriller"]
    count by each genre，return genre_list in reverse order
    eg:
    (('Thriller',2),('Action',1),('Sci-Fi',1),('Horror', 1), ('Adventure',1),('Crime',1))
    return:['Thriller','Action','Sci-Fi','Horror','Adventure','Crime']
    """
    genres_dict = defaultdict(int)
    for genres in genres_list:
        for genre in genres.split('|'):
            genres_dict[genre] += 1
    sorted_genres = sorted(genres_dict.items(), key=lambda x: x[1], reverse=True)
    return [x[0] for x in sorted_genres]

def add_user_features(samples_with_movie_features):
    extract_genres_udf = udf(extract_genres, ArrayType(StringType()))
    samples_with_user_features = samples_with_movie_features .\
    withColumn('userPositiveHistory', F.collect_list(F.when(F.col('label') == 1, F.col('movieId')).
                                                    otherwise(F.lit(None))).over(sql.Window.partitionBy('userId').
                                                                                orderBy(F.col('timestamp')).
                                                                                rowsBetween(-100, -1))) .\
    withColumn('userPositiveHistory', F.reverse(F.col('userPositiveHistory'))) .\
    withColumn('userRatedMovie1', F.col('userPositiveHistory')[0]) .\
    withColumn('userRatedMovie2', F.col('userPositiveHistory')[1]) .\
    withColumn('userRatedMovie3', F.col('userPositiveHistory')[2]) .\
    withColumn('userRatedMovie4', F.col('userPositiveHistory')[3]) .\
    withColumn('userRatedMovie5', F.col('userPositiveHistory')[4]) .\
    withColumn('userRatingCount', F.count(F.lit(1)).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))).\
    withColumn('userAvgReleaseYear', F.avg(F.col('releaseYear')).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)).cast(IntegerType())).\
    withColumn('userReleaseYearStddev', F.stddev(F.col('releaseYear')).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))).\
    withColumn('userAvgRating', F.format_number(F.avg(F.col('rating')).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)), NUMBER_PRECISION)).\
    withColumn('userRatingStddev', F.stddev(F.col('rating')).over(sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1))).\
    withColumn('userGenres', extract_genres_udf(F.collect_list(F.when(F.col('label') == 1, F.col('genres')).otherwise(F.lit(None))).over(
    sql.Window.partitionBy('userId').orderBy('timestamp').rowsBetween(-100, -1)))).\
    withColumn("userRatingStddev", F.format_number(F.col("userRatingStddev"), NUMBER_PRECISION)).\
    withColumn("userReleaseYearStddev", F.format_number(F.col("userReleaseYearStddev"), NUMBER_PRECISION)).\
    withColumn("userGenre1", F.col("userGenres")[0]).\
    withColumn("userGenre2", F.col("userGenres")[1]).\
    withColumn("userGenre3", F.col("userGenres")[2]).\
    withColumn("userGenre4", F.col("userGenres")[3]).\
    withColumn("userGenre5", F.col("userGenres")[4]).\
    drop("genres", 'userGenres', 'userPositiveHistory').\
    filter(F.col('userRatingCount') > 1)
    samples_with_user_features.printSchema()
    return samples_with_user_features

samples_with_user_features = add_user_features(samples_with_movie_features)
samples_with_user_features.filter(samples_with_movie_features['userId'] == 1).orderBy(F.col('timestamp').asc()).toPandas()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- releaseYear: integer (nullable = true)
 |-- movieGenre1: string (nullable = true)
 |-- movieGenre2: string (nullable = true)
 |-- movieGenre3: string (nullable = true)
 |-- movieRatingCount: long (nullable = true)
 |-- movieAvgRating: string (nullable = true)
 |-- movieRatingStddev: string (nullable = true)
 |-- userRatedMovie1: string (nullable = true)
 |-- userRatedMovie2: string (nullable = true)
 |-- userRatedMovie3: string (nullable = true)
 |-- userRatedMovie4: string (nullable = true)
 |-- userRatedMovie5: string (nullable = true)
 |-- userRatingCount: long (nullable = false)
 |-- userAvgReleaseYear: integer (nullable = true)
 |-- userReleaseYearStddev: string (nullable = true)
 |-- userAvgRating: string (nullable = true)
 |-- userRatingStddev: string (nullable = true)
 |-- use

Unnamed: 0,movieId,userId,rating,timestamp,label,releaseYear,movieGenre1,movieGenre2,movieGenre3,movieRatingCount,...,userRatingCount,userAvgReleaseYear,userReleaseYearStddev,userAvgRating,userRatingStddev,userGenre1,userGenre2,userGenre3,userGenre4,userGenre5
0,653,1,3.0,1094785691,0,1996,Action,Adventure,Fantasy,3148,...,2,1953,20.51,3.5,0.0,Adventure,Drama,Sci-Fi,Children,Fantasy
1,337,1,3.5,1094785709,1,1993,Drama,,,3889,...,3,1967,28.5,3.33,0.29,Adventure,Drama,Sci-Fi,Children,Fantasy
2,151,1,4.0,1094785734,1,1995,Action,Drama,Romance,2767,...,4,1974,26.5,3.38,0.25,Adventure,Drama,Sci-Fi,Children,Fantasy
3,112,1,3.5,1094785740,1,1995,Action,Adventure,Comedy,2577,...,5,1978,24.79,3.5,0.35,Drama,Adventure,Sci-Fi,Children,Fantasy
4,50,1,3.5,1112484580,1,1995,Crime,Mystery,Thriller,10221,...,6,1981,23.21,3.5,0.32,Adventure,Drama,Action,Sci-Fi,Children
5,541,1,4.0,1112484603,1,1982,Action,Sci-Fi,Thriller,6635,...,7,1983,21.84,3.5,0.29,Adventure,Drama,Action,Crime,Sci-Fi
6,593,1,3.5,1112484661,1,1991,Crime,Horror,Thriller,13692,...,8,1982,20.22,3.56,0.32,Adventure,Drama,Action,Sci-Fi,Crime
7,29,1,3.5,1112484676,1,1995,Adventure,Drama,Fantasy,1859,...,9,1983,19.11,3.56,0.3,Adventure,Drama,Action,Crime,Thriller
8,293,1,4.0,1112484703,1,1994,Action,Crime,Drama,5587,...,10,1984,18.36,3.55,0.28,Adventure,Drama,Sci-Fi,Action,Crime
9,47,1,3.5,1112484727,1,1995,Mystery,Thriller,,9335,...,11,1985,17.64,3.59,0.3,Drama,Adventure,Action,Crime,Thriller


In [33]:
def split_and_save_training_test_samples(samples_with_user_features, file_path):
    #samples_with_user_features = samples_with_user_features.sample(0.1)
    training, test = samples_with_user_features.randomSplit((0.8, 0.2))
    training_save_path = file_path + '/training_samples'
    test_save_path = file_path + '/test_samples'
    training.repartition(1).write.option("header", "true").mode('overwrite').csv(training_save_path)
    test.repartition(1).write.option("header", "true").mode('overwrite').csv(test_save_path)
    
split_and_save_training_test_samples(samples_with_user_features, 'file:///home/yeyuel/code-prac/dl-prac/project/tutorial/data/recommend/ml-latest-small')