In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.sql.functions import expr

In [2]:
spark = SparkSession.builder.appName("ALSmodel").config('spark.driver.memory','16G').getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/05 11:31:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# yarn mode
spark = SparkSession\
        .builder\
        .master("yarn")\
        .config('spark.executor.instances','10')\
        .config('spark.executor.memory','8G')\
        .config('spark.driver.memory','2G')\
        .appName("modelTraining0723")\
        .getOrCreate()

In [None]:
spark

##### Read raw dataset

###### df_ratings

In [8]:
df_ratings = spark.read.csv("file:///home/yutinglin/ml-25m/ratings.csv", inferSchema=True, header=True)

# df_ratings.show()
df_ratings.describe().show()



+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|         25000095|          25000095|          25000095|            25000095|
|   mean|81189.28115381162|21387.981943268616| 3.533854451353085|1.2156014431215513E9|
| stddev|46791.71589745776| 39198.86210105973|1.0607439611423535| 2.268758080595386E8|
|    min|                1|                 1|               0.5|           789652009|
|    max|           162541|            209171|               5.0|          1574327703|
+-------+-----------------+------------------+------------------+--------------------+



                                                                                

In [9]:
df_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



###### df_ratings_new

In [10]:
# df.withColumn("salary",col("salary")*100).show()
from pyspark.sql.functions import *

df_ratings_new = df_ratings.withColumn("rating_new",col("rating")/5)
df_ratings_new.show()
# df_ratings_new.printSchema()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|rating_new|
+------+-------+------+----------+----------+
|     1|    296|   5.0|1147880044|       1.0|
|     1|    306|   3.5|1147868817|       0.7|
|     1|    307|   5.0|1147868828|       1.0|
|     1|    665|   5.0|1147878820|       1.0|
|     1|    899|   3.5|1147868510|       0.7|
|     1|   1088|   4.0|1147868495|       0.8|
|     1|   1175|   3.5|1147868826|       0.7|
|     1|   1217|   3.5|1147878326|       0.7|
|     1|   1237|   5.0|1147868839|       1.0|
|     1|   1250|   4.0|1147868414|       0.8|
|     1|   1260|   3.5|1147877857|       0.7|
|     1|   1653|   4.0|1147868097|       0.8|
|     1|   2011|   2.5|1147868079|       0.5|
|     1|   2012|   2.5|1147868068|       0.5|
|     1|   2068|   2.5|1147869044|       0.5|
|     1|   2161|   3.5|1147868609|       0.7|
|     1|   2351|   4.5|1147877957|       0.9|
|     1|   2573|   4.0|1147878923|       0.8|
|     1|   2632|   5.0|1147878248|

In [None]:
numerator = df_ratings.count()
numerator

In [None]:
users = df_ratings.select("userId").distinct().count()
users

In [None]:
movies = df_ratings.select("movieId").distinct().count()
movies

In [None]:
denominator = users * movies
denominator

In [None]:
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: ", sparsity*100 , "%")

In [11]:
#需要叢集
df_ratings_ps= df_ratings.toPandas()
R_df = df_ratings_ps.pivot(index = 'userId', columns ='movieId', values = 'rating')



MemoryError: Unable to allocate 71.5 GiB for an array with shape (162541, 59047) and data type float64

In [None]:
# Number of ratings in matrix
numerator = df_ratings.count()
# Distinct users and movies
users = df_ratings.select("userId").distinct().count()
movies = df_ratings.select("movieId").distinct().count()
# Number of ratings matrix could contain if no empty cells
denominator = users * movies
#Calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: "), sparsity

###### df_movies

In [None]:
df_movies = spark.read.csv("file:///home/yutinglin/ml-25m/movies.csv", inferSchema=True, header=True)

df_movies.describe().show()

###### df_pre

In [None]:
# df_pre = spark.read.csv("file:///home/yutinglin/ml-25m/preference.csv", inferSchema=True, header=True)
# df_pre = df_pre.drop('_c0')
# df_pre.describe().show()

In [None]:
# df_pre.printSchema()

In [None]:
# df_pre.write.parquet('file:///home/yutinglin/ml-25m/df_pre.csv', mode='overwrite')


In [None]:
df_pre=spark.read.parquet('file:///home/yutinglin/ml-25m/df_pre.csv') 

###### df_zscore

In [3]:
df_zscore = spark.read.csv("file:///home/yutinglin/ml-25m/zscore.csv", inferSchema=True, header=True)

df_zscore.describe().show()



+-------+-----------------+------------------+--------------------+--------------------+
|summary|           userId|           movieId|           timestamp|              zscore|
+-------+-----------------+------------------+--------------------+--------------------+
|  count|         25000095|          25000095|            25000095|            25000095|
|   mean|81189.28115381162|21387.981943268616|1.2156014431215518E9|-1.14208089891811...|
| stddev|46791.71589745487|39198.862101060164|2.2687580805953485E8|  1.0000000199999284|
|    min|                1|                 1|           789652009| -2.8601195228702334|
|    max|           162541|            209171|          1574327703|  1.3821861181191855|
+-------+-----------------+------------------+--------------------+--------------------+



                                                                                

###### df_boxcox

In [14]:
df_boxcox = spark.read.csv("file:///home/yutinglin/ml-25m/boxcox.csv", inferSchema=True, header=True)

df_boxcox.describe().show()



+-------+-----------------+------------------+-------------------+
|summary|           userId|           movieId|              rc_bc|
+-------+-----------------+------------------+-------------------+
|  count|         25000095|          25000095|           25000095|
|   mean|81189.28115381162|21387.981943268616| 4.2903387630443115|
| stddev|46791.71589745411|39198.862101059785|  2.099199282879676|
|    min|                1|                 1|-0.4190929316066643|
|    max|           162541|            209171|  7.563974611605411|
+-------+-----------------+------------------+-------------------+



                                                                                

##### Subset1: df_after2015

In [None]:
#transform timestamp
from pyspark.sql.functions import *
from pyspark.sql.types import StringType,DoubleType,IntegerType

df_ratings_year = df_ratings.withColumn('year',from_unixtime(col('timestamp'),"yyyy"))



# df_ratings.withColumn('real_time',from_unixtime(col('timestamp'),"MM-dd-yyyy HH:mm:ss").alias('timestamp')).show()
# df_ratings.select(from_unixtime(col('timestamp'),"yyyy").alias('timestamp')).show()
# df_ratings.withColumn('real_time',from_unixtime('timestamp')).show()
# df_ratings.withColumn('real_time',fn.from_unixtime('timestamp').cast(DataType())).show()

In [None]:
df_ratings_year.show(5)

In [None]:
df_after2015 = df_ratings_year.filter(df_ratings_year['year'] >= 2015)

In [None]:
df_after2015.describe().show()

In [None]:
df_after2015.write.parquet('file:///home/yutinglin/ml-25m/df_after2015.csv', mode='overwrite')


##### Subset2 : df_HF_movies

In [None]:
import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('movieId')

df_movie_count= df_ratings.select('movieId','userId','rating',f.count('movieId').over(w).alias('count')).sort('movieId')
# groupby('movieId').count().select('movieId',f.col('count').alias
df_HF_movies =df_movie_count.filter(df_movie_count['count']>=100)
df_HF_movies.describe().show()

In [None]:
df_HF_movies.write.parquet('file:///home/yutinglin/ml-25m/df_HF_movies.csv', mode='overwrite')


##### Subset3 : df_HF_users

In [None]:
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('userId')

df_user_count= df_ratings.select('movieId','userId','rating',f.count('userId').over(w).alias('count')).sort('userId')
# groupby('movieId').count().select('movieId',f.col('count').alias
df_HF_users =df_user_count.filter(df_user_count['count']>=50)
df_HF_users.describe().show()

In [None]:
df_HF_users.write.parquet('file:///home/yutinglin/ml-25m/df_HF_users.csv', mode='overwrite')


In [None]:
#count the number of rating at each movie
count = df_ratings.groupby('movieId').count()

count.show(5)

In [None]:
#filter movies with 100 or more ratings
#weird join
df_HF_movies =df_count.join(df_ratings, df_count['movieId'] == df_ratings['movieId'], 'left')\
.filter(df_count['count']>=100)
# .select(df_count["movieId"],df_count["count"], df_ratings["userId"], df_ratings["rating"])\
# .select(rec["movieId"], rec["userId"], rec["prediction"], df_movies["title"],df_movies["genres"])

df_HF_movies.describe().show()

In [None]:
# Pandas UDF
import pandas as pd
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType

In [None]:
#return year column
@pandas_udf('int')
def HF_movies(movieId: pd.Series) -> pd.Series:
    if movieId in df_count['movieId']:
        return 'a'
    else:
        return 'b'

# count_udf = udf(lambda movie: 1 if movieId in df_count['movieId'] else 0, IntegerType())
df_ratings.withColumn("HF_movie", HF_movies(df_ratings['movieId'])).show(10)

# spark.sql('''select count(StockCode) from rtTable''').show()

#2015以後的使用者
# @pandas_udf(IntegerType())
# def after_2015(timestamp):
    

# df_HF_movies =df_count.join(df_ratings, df_count['movieId'] == df_ratings['movieId'], 'inner')\
#     .filter(df_count['count']>=100)
    
# df_HF_movies.describe().show()

#movies with 100 or more ratings

##### Subset4 : df_preference

In [3]:
df_normalized = spark.read.csv("file:///home/yutinglin/ml-25m/ratings_variance.csv", inferSchema=True, header=True)

# df_normalized.show()
df_normalized = df_normalized.drop('_c0')
df_normalized.describe().show()



+-------+-----------------+------------------+--------------------+--------------------+------------------+
|summary|           userId|           movieId|           timestamp|     gauss_normalize|          variance|
+-------+-----------------+------------------+--------------------+--------------------+------------------+
|  count|         25000095|          25000095|            25000095|            24984248|          25000095|
|   mean|81189.28115381162|21387.981943268616|1.2156014431215518E9|-4.16608287726484...|0.9022676609016025|
| stddev| 46791.7158974552| 39198.86210106045|2.2687580805953774E8| 0.08059933882609976|0.4775038982124392|
|    min|                1|                 1|           789652009| -0.9993748045653336|               0.0|
|    max|           162541|            209171|          1574327703|    0.99365072945774| 5.328947368421052|
+-------+-----------------+------------------+--------------------+--------------------+------------------+



                                                                                

In [4]:
df_normalized_lowVar =df_normalized.filter(df_normalized['variance']==0)
df_normalized_lowVar.count()

                                                                                

15847

In [5]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT

df_normalized.printSchema()


root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- gauss_normalize: double (nullable = true)
 |-- variance: double (nullable = true)



In [6]:
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
df_normalized = df_normalized.select("userId","movieId",to_vector("variance").alias("variance_vec"))
df_normalized.printSchema()


root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- variance_vec: vector (nullable = true)



In [7]:
from pyspark.ml.feature import MinMaxScaler
mmScaler = MinMaxScaler(inputCol='variance_vec',outputCol='variance_vec_scaled')
model = mmScaler.fit(df_normalized)
model.transform(df_normalized).show()
df_var_scaled=model.transform(df_normalized)

                                                                                

+------+-------+-------------------+--------------------+
|userId|movieId|       variance_vec| variance_vec_scaled|
+------+-------+-------------------+--------------------+
|     1|    296|[1.008488612836439]|[0.18924724586560...|
|     1|    306|[1.008488612836439]|[0.18924724586560...|
|     1|    307|[1.008488612836439]|[0.18924724586560...|
|     1|    665|[1.008488612836439]|[0.18924724586560...|
|     1|    899|[1.008488612836439]|[0.18924724586560...|
|     1|   1088|[1.008488612836439]|[0.18924724586560...|
|     1|   1175|[1.008488612836439]|[0.18924724586560...|
|     1|   1217|[1.008488612836439]|[0.18924724586560...|
|     1|   1237|[1.008488612836439]|[0.18924724586560...|
|     1|   1250|[1.008488612836439]|[0.18924724586560...|
|     1|   1260|[1.008488612836439]|[0.18924724586560...|
|     1|   1653|[1.008488612836439]|[0.18924724586560...|
|     1|   2011|[1.008488612836439]|[0.18924724586560...|
|     1|   2012|[1.008488612836439]|[0.18924724586560...|
|     1|   206

In [8]:
df_var_scaled.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- variance_vec: vector (nullable = true)
 |-- variance_vec_scaled: vector (nullable = true)



In [9]:
from pyspark.sql.types import FloatType
to_float = udf(lambda x : float(x[0]),FloatType())
df_var_scaled = df_var_scaled.select("userId","movieId",to_float("variance_vec_scaled").alias("rating_var"))
df_var_scaled.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating_var: float (nullable = true)



In [10]:
df_var_scaled.describe().show()



+-------+-----------------+------------------+-------------------+
|summary|           userId|           movieId|         rating_var|
+-------+-----------------+------------------+-------------------+
|  count|         25000095|          25000095|           25000095|
|   mean|81189.28115381162|21387.981943268616|0.16931442522548906|
| stddev| 46791.7158974552| 39198.86210106045| 0.0896056697583036|
|    min|                1|                 1|                0.0|
|    max|           162541|            209171|                1.0|
+-------+-----------------+------------------+-------------------+



                                                                                

In [11]:
df_var_scaled.summary().show()



+-------+-----------------+------------------+-------------------+
|summary|           userId|           movieId|         rating_var|
+-------+-----------------+------------------+-------------------+
|  count|         25000095|          25000095|           25000095|
|   mean|81189.28115381162|21387.981943268616|0.16931442522548906|
| stddev| 46791.7158974552| 39198.86210106045| 0.0896056697583036|
|    min|                1|                 1|                0.0|
|    25%|            40511|              1197|        0.106688865|
|    50%|            80902|              2947|         0.15228799|
|    75%|           121553|              8623|         0.21209832|
|    max|           162541|            209171|                1.0|
+-------+-----------------+------------------+-------------------+



                                                                                

In [None]:
var_scaled_no =df_var_scaled.filter(df_var_scaled['rating_var']==0)
var_scaled_no.count()

In [None]:
# df_var_scaled.join(df_ratings, df_var_scaled.userId == df_ratings.userId, 'inner')\
# .select(df_var_scaled["userId"], df_var_scaled["movieId"],df_ratings["rating"], df_var_scaled["rating_var"])\
# .show()

df_preference = df_var_scaled.join(df_ratings, \
                                   (df_var_scaled.userId == df_ratings.userId) & (df_var_scaled.movieId== df_ratings.movieId),\
                                   'inner')\
.select(df_var_scaled["userId"], df_var_scaled["movieId"],df_ratings["rating"], df_var_scaled["rating_var"])\
.filter((df_var_scaled['rating_var'] >= 0.1) & (df_var_scaled['rating_var'] <= 0.22))

df_preference.describe().show()


# rec.join(df_movies, rec.movieId == df_movies.movieId, 'inner')\
# .select(rec["movieId"], rec["userId"], rec["prediction"], df_movies["title"],df_movies["genres"])\
# .distinct()\
# .orderBy(rec["prediction"],ascending=False)\
# .show()


In [None]:
df_preference.write.parquet('file:///home/yutinglin/ml-25m/df_preference.csv', mode='overwrite')


##### Subset5 : df_after2015_pre

In [None]:
#transform timestamp
from pyspark.sql.functions import *
from pyspark.sql.types import StringType,DoubleType,IntegerType

df_prefer_year = df_pre.withColumn('year',from_unixtime(col('timestamp'),"yyyy"))


In [None]:
df_prefer_year.show(5)

In [None]:
df_after2015_pre = df_prefer_year.filter(df_prefer_year['year'] >= 2015)

In [None]:
df_after2015_pre.describe().show()

In [None]:
df_after2015_pre.write.parquet('file:///home/yutinglin/ml-25m/df_after2015_pre.csv', mode='overwrite')


##### Subset6 : df_HF_movies_pre

In [None]:
import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy('movieId')

df_movie_count= df_pre.select('movieId','userId','prefer',f.count('movieId').over(w).alias('count')).sort('movieId')
# groupby('movieId').count().select('movieId',f.col('count').alias
df_HF_movies_pre =df_movie_count.filter(df_movie_count['count']>=100)
df_HF_movies_pre.describe().show()

In [None]:
df_HF_movies_pre.write.parquet('file:///home/yutinglin/ml-25m/df_HF_movies_pre.csv', mode='overwrite')


##### Subset7 : df_HF_users_pre

In [None]:
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('userId')

df_user_count= df_pre.select('movieId','userId','prefer',f.count('userId').over(w).alias('count')).sort('userId')
# groupby('movieId').count().select('movieId',f.col('count').alias
df_HF_users_pre =df_user_count.filter(df_user_count['count']>=50)
df_HF_users_pre.describe().show()

In [None]:
df_HF_users_pre.write.parquet('file:///home/yutinglin/ml-25m/df_HF_users_pre.csv', mode='overwrite')


##### Subset8 : df_preference_pre

In [None]:
type(df_pre)

In [None]:
df_pre=spark.read.parquet('file:///home/yutinglin/ml-25m/df_pre.csv') 
df_pre.describe().show()

In [None]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT

df_pre.printSchema()


In [None]:
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
df_pre = df_pre.select("userId","movieId",'prefer',to_vector("variance").alias("variance_vec"))
df_pre.printSchema()


In [None]:
from pyspark.ml.feature import MinMaxScaler
mmScaler = MinMaxScaler(inputCol='variance_vec',outputCol='variance_vec_scaled')
model = mmScaler.fit(df_pre)
model.transform(df_pre).show()
df_var_scaled=model.transform(df_pre)

In [None]:
df_var_scaled.printSchema()

In [None]:
from pyspark.sql.types import FloatType
to_float = udf(lambda x : float(x[0]),FloatType())
df_var_scaled = df_var_scaled.select("userId","movieId",'prefer',to_float("variance_vec_scaled").alias("prefer_var"))
df_var_scaled.printSchema()

In [None]:
df_var_scaled.show()

In [None]:
df_var_scaled.describe().show()

In [None]:
df_var_scaled.summary().show()

In [None]:
var_scaled_no =df_var_scaled.filter(df_var_scaled['prefer_var']==0)
var_scaled_no.count()

In [None]:
df_preference_pre = df_var_scaled.filter((df_var_scaled['prefer_var'] >= 0.87) & (df_var_scaled['prefer_var'] <= 0.93))

df_preference_pre.describe().show()

# join(df_pre, \
#                                    (df_var_scaled.userId == df_pre.userId) & (df_var_scaled.movieId== df_pre.movieId),\
#                                    'inner')\
# .select(df_var_scaled["userId"], df_var_scaled["movieId"],df_pre["prefer"], df_var_scaled["prefer_var"])\


In [None]:
df_preference_pre.write.parquet('file:///home/yutinglin/ml-25m/df_preference_pre.csv', mode='overwrite')


##### Model training(general)

###### df_zscore

In [4]:
(train, test) = df_zscore.randomSplit([0.8,0.2],seed=42)

als= ALS(rank=35,\
         maxIter=20,\
         regParam=0.03,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="zscore",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model = als.fit(train)

                                                                                

In [5]:
pred = model.transform(test)

pred.show()

# pred_test = pred.filter(pred.prediction != float('nan'))

                                                                                

+------+-------+----------+--------------------+-----------+
|userId|movieId| timestamp|              zscore| prediction|
+------+-------+----------+--------------------+-----------+
|     3| 175197|1566089493|-0.03191576221062093|0.043685682|
|    12|    471|1167582388| 0.43945153123264785| 0.27573675|
|    12|   1580|1167582669|-0.03191576221062093|0.027434805|
|    13|   1580|1238029138|-0.03191576221062093|  0.1844796|
|    20|   1580|1155082328| 0.43945153123264785|  0.5758398|
|    41|   1580| 944572899| 0.43945153123264785| 0.23786783|
|    41|   2366| 944572005| -0.5032830556538898| 0.10774833|
|    72|    471| 982866659|  1.3821861181191855| 0.13874339|
|    72|   1342| 980622798| 0.43945153123264785|        0.0|
|    72|   1580| 980642551| 0.43945153123264785| 0.14990088|
|    72|   1591| 980643638| -1.4460176425404272|        0.0|
|    72|   1645| 980622971| 0.43945153123264785|0.038670823|
|    72|   3918| 980623106| -0.5032830556538898|        0.0|
|    86|   1580| 9454614

In [7]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="zscore", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="zscore", predictionCol="prediction")

rmse = eval_rmse.evaluate(pred)
r2 = eval_r2.evaluate(pred)
print(f"RMSE:{rmse}")
print(f"R square:{r2}")



RMSE:0.9125571076803415
R square:0.16770472381390178


                                                                                

###### df_ratings_new

In [11]:
(train, test) = df_ratings_new.randomSplit([0.8,0.2],seed=42)

als= ALS(rank=35,\
         maxIter=20,\
         regParam=0.03,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rating_new",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model = als.fit(train)

                                                                                

In [12]:
pred = model.transform(test)

pred.show()

# pred_test = pred.filter(pred.prediction != float('nan'))



+------+-------+------+----------+----------+----------+
|userId|movieId|rating| timestamp|rating_new|prediction|
+------+-------+------+----------+----------+----------+
|     1|    307|   5.0|1147868828|       1.0|0.77766305|
|     1|   1175|   3.5|1147868826|       0.7| 0.7663948|
|     1|   1237|   5.0|1147868839|       1.0|0.79489815|
|     1|   2012|   2.5|1147868068|       0.5| 0.5883669|
|     1|   2692|   5.0|1147869100|       1.0|0.76634437|
|     1|   3949|   5.0|1147868678|       1.0|0.77087057|
|     1|   4973|   4.5|1147869080|       0.9| 0.8004116|
|     1|   5912|   3.0|1147878698|       0.6|0.77288365|
|     1|   7318|   2.0|1147879850|       0.4|0.57639796|
|     1|   7323|   3.5|1147869119|       0.7| 0.7598611|
|     1|   7327|   3.5|1147868855|       0.7| 0.7910724|
|     1|   7365|   4.0|1147869033|       0.8| 0.7385984|
|     1|   7937|   3.0|1147878055|       0.6|0.76165444|
|     1|   8014|   3.5|1147869155|       0.7| 0.7924493|
|     1|   8786|   4.0|11478778

                                                                                

In [13]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating_new", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating_new", predictionCol="prediction")

rmse = eval_rmse.evaluate(pred)
r2 = eval_r2.evaluate(pred)
print(f"RMSE:{rmse}")
print(f"R square:{r2}")

[Stage 905:>                                                        (0 + 2) / 2]

RMSE:0.16732388624301475
R square:0.3781462887623034


                                                                                

###### df_boxcox

In [15]:
(train, test) = df_boxcox.randomSplit([0.8,0.2],seed=42)

als= ALS(rank=35,\
         maxIter=20,\
         regParam=0.03,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rc_bc",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_ = als.fit(train)

                                                                                

In [16]:
pred = model_.transform(test)

pred.show()

# pred_test = pred.filter(pred.prediction != float('nan'))



+------+-------+------------------+----------+
|userId|movieId|             rc_bc|prediction|
+------+-------+------------------+----------+
|     1|    307| 7.563974611605411|  3.985095|
|     1|   1175| 4.006240099607991|  7.159772|
|     1|   1237| 7.563974611605411| 6.0973177|
|     1|   2012|2.0799605771334155|  2.837972|
|     1|   2692| 7.563974611605411| 7.5876455|
|     1|   3949| 7.563974611605411|  4.739518|
|     1|   4973| 6.295084573613697|  7.661487|
|     1|   5912|2.9950060072720857| 5.3836765|
|     1|   7318| 1.268483561272075| 2.5669513|
|     1|   7323| 4.006240099607991| 4.7378273|
|     1|   7327| 4.006240099607991|  4.497752|
|     1|   7365| 5.107848112348889|  4.410076|
|     1|   7937|2.9950060072720857|  5.786151|
|     1|   8014| 4.006240099607991| 5.2872214|
|     1|   8786| 5.107848112348889|  4.243331|
|     1|  32591| 7.563974611605411|  4.325614|
|     3|      1| 5.107848112348889|  4.761226|
|     3|    111| 5.107848112348889|  5.883011|
|     3|    2

                                                                                

In [17]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rc_bc", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rc_bc", predictionCol="prediction")

rmse = eval_rmse.evaluate(pred)
r2 = eval_r2.evaluate(pred)
print(f"RMSE:{rmse}")
print(f"R square:{r2}")

[Stage 1412:>                                                       (0 + 2) / 2]

RMSE:1.5836645170756538
R square:0.431305916025155


                                                                                

###### df_ratings

In [None]:
(train, test) = df_ratings.randomSplit([0.8,0.2],seed=42)

als= ALS(rank=35,\
         maxIter=20,\
         regParam=0.03,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rating",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model = als.fit(train)

In [None]:
pred = model.transform(test)

pred.show()

# pred_test = pred.filter(pred.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse = eval_rmse.evaluate(pred)
r2 = eval_r2.evaluate(pred)
print(f"RMSE:{rmse}")
print(f"R square:{r2}")

In [None]:
# df_ratings_recs_10 = model.recommendForAllUsers(10)
# df_ratings_recs_10.printSchema()
df_ratings_user_recs_10 = df_ratings_recs_10.select("userId","recommendations.movieId", "recommendations.rating").toPandas()


In [None]:
df_ratings_user_recs_10
df_ratings_user_recs_10.to_csv("0731df_ratings_user_recs_10.csv")

In [None]:
df_ratings_recs_30 = model.recommendForAllUsers(30)
# user_recs.describe().show()
# user_recs_30.show(truncate=False)
df_ratings_recs_30.printSchema()
#get all user recommendation df
df_ratings_user_recs_30 = df_ratings_recs_30.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_ratings_user_recs_30
df_ratings_user_recs_30.to_csv("0731df_ratings_user_recs_30.csv")

In [None]:
df_ratings_recs_50 = model.recommendForAllUsers(50)
# user_recs.describe().show()
# user_recs_30.show(truncate=False)
df_ratings_recs_50.printSchema()
#get all user recommendation df
df_ratings_user_recs_50 = df_ratings_recs_50.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_ratings_user_recs_50
df_ratings_user_recs_50.to_csv("0731df_ratings_user_recs_50.csv")

In [None]:
model.write().save("file:///home/yutinglin/ml-25m/ALS_df_ratings0731")

In [None]:
user_recs_30 = model.recommendForAllUsers(30)
# user_recs.describe().show()
# user_recs_30.show(truncate=False)
user_recs_30.printSchema()
#get all user recommendation df
df_user_recs_30 = user_recs_30.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs_30
df_user_recs_30.to_csv("0730df_user_recs_30.csv")

In [None]:
user_1 = test.filter(test['userId'] == 1 ).select(['movieId','userId'])
user_1.show()

In [None]:
rec = model.transform(user_1) 
rec.orderBy("Prediction",ascending=False).show() 

In [None]:
rec.join(df_movies, rec.movieId == df_movies.movieId, 'inner')\
.select(rec["movieId"], rec["userId"], rec["prediction"], df_movies["title"],df_movies["genres"])\
.distinct()\
.orderBy(rec["prediction"],ascending=False)\
.show()

# .orderBy(rec["prediction"],ascending=False)\
# .sort(rec["prediction"].desc())\

###### df_after2015

In [None]:
df_after2015=spark.read.parquet('file:///home/yutinglin/ml-25m/df_after2015.csv') 

In [None]:
df_after2015.describe().show()

In [None]:
(train, test) = df_after2015.randomSplit([0.7,0.3],seed=42)

als_2= ALS(maxIter=5,\
         regParam=0.01,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rating",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_2 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_2 = als_2.fit(train)

In [None]:
pred_2 = model_2.transform(test)

pred_2.show()

pred_test_2 = pred_2.filter(pred_2.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse_2 = eval_rmse.evaluate(pred_test_2)
r2_2 = eval_r2.evaluate(pred_test_2)
print(f"RMSE:{rmse_2}")
print(f"R square:{r2_2}")

###### df_HF_movies

In [23]:
df_HF_movies=spark.read.parquet('file:///home/yutinglin/ml-25m/df_HF_movies.csv') 

In [24]:
df_HF_movies.describe().show()



+-------+------------------+-----------------+------------------+------------------+
|summary|           movieId|           userId|            rating|             count|
+-------+------------------+-----------------+------------------+------------------+
|  count|          24443380|         24443380|          24443380|          24443380|
|   mean|19699.994239585525|81216.30976935268| 3.543231950736764|15263.862510503866|
| stddev| 36767.52046795476|46808.40630677884|1.0572653075919871|16469.706188953154|
|    min|                 1|                1|               0.5|               100|
|    max|            205383|           162541|               5.0|             81491|
+-------+------------------+-----------------+------------------+------------------+



                                                                                

In [26]:
(train, test) = df_HF_movies.randomSplit([0.8,0.2],seed=42)

als_3= ALS(rank=35,\
         maxIter=20,\
         regParam=0.03,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rating",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_3 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_3 = als_3.fit(train)

                                                                                

In [27]:
pred_3 = model_3.transform(test)

pred_3.show()

# pred_test_3 = pred_3.filter(pred_3.prediction != float('nan'))

                                                                                

+-------+------+------+-----+----------+
|movieId|userId|rating|count|prediction|
+-------+------+------+-----+----------+
|    306|     1|   3.5| 7058| 4.6171393|
|    665|     1|   5.0| 1269|  3.969222|
|   2351|     1|   4.5| 1290| 3.4489627|
|   4422|     1|   3.0|  780| 3.8485675|
|   4973|     1|   4.5|34320| 4.8760133|
|   6370|     1|   4.5| 1220| 3.7720926|
|   6377|     1|   4.0|34712| 3.9659152|
|  27266|     1|   4.5| 1375| 3.5505352|
|  27721|     1|   3.0| 2117| 3.7209132|
|     32|     3|   4.5|47054|  4.274567|
|    172|     3|   4.0|12156|  3.364945|
|    480|     3|   2.0|64144| 3.4031963|
|    593|     3|   4.0|74127| 4.1176667|
|    745|     3|   5.0|12241|  4.024233|
|   1196|     3|   4.0|57361|  4.177161|
|   1214|     3|   4.0|36357| 4.0862503|
|   1270|     3|   3.5|49595|  3.641727|
|   1676|     3|   3.5|17245| 3.4691608|
|   2021|     3|   4.0| 8990| 3.9909253|
|   2105|     3|   3.0|10277| 3.7466965|
+-------+------+------+-----+----------+
only showing top

In [29]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse_3 = eval_rmse.evaluate(pred_3)
r2_3 = eval_r2.evaluate(pred_3)
print(f"RMSE:{rmse_3}")
print(f"R square:{r2_3}")

[Stage 2313:>                                                       (0 + 2) / 2]

RMSE:0.7747568179286938
R square:0.4631616565849058


                                                                                

###### df_HF_users

In [None]:
df_HF_users=spark.read.parquet('file:///home/yutinglin/ml-25m/df_HF_users.csv') 

In [None]:
df_HF_users.describe().show()

In [None]:
(train, test) = df_HF_users.randomSplit([0.8,0.2],seed=42)


als_4= ALS(rank=35,\
           maxIter=20,\
         regParam=0.03,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rating",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_4 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_4 = als_4.fit(train)

In [None]:
user_recs = model_4.recommendForAllUsers(10)
user_recs.show(truncate=False)


In [None]:
user_recs.printSchema()


In [None]:
# user_recs.printSchema()
df_user_recs = user_recs.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs

In [None]:
df_user_recs.to_csv("0730df_user_recs_10.csv")

In [None]:
user_recs_30.printSchema()


In [None]:
user_recs_30 = model_4.recommendForAllUsers(30)
# user_recs.describe().show()
# user_recs_30.show(truncate=False)
user_recs_30.printSchema()
#get all user recommendation df
df_user_recs_30 = user_recs_30.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs_30
df_user_recs_30.to_csv("0730df_user_recs_30.csv")

In [None]:
user_recs_50 = model_4.recommendForAllUsers(50)
# user_recs.describe().show()
# user_recs_30.show(truncate=False)
user_recs_50.printSchema()
#get all user recommendation df
df_user_recs_50 = user_recs_50.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs_50
df_user_recs_50.to_csv("0730df_user_recs_50.csv")

In [None]:
user_recs = model_4.recommendForAllUsers(10)
# user_recs.describe().show()
user_recs.show(truncate=False)
user_recs.printSchema()
#get all user recommendation df
df_user_recs = user_recs.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs
df_user_recs.to_csv("df_user_recs.csv")

In [None]:
model_4.write().save("file:///home/yutinglin/ml-25m/ALS_df_HF_users0730")

In [None]:


pred_4 = model_4.transform(test)

pred_4.show()


# pred_test_4 = pred_4.filter(pred_4.prediction != float('nan'))

# pred_4_countna =pred_4.filter(pred_4['prediction']!= float('nan'))
# pred_4_countna.describe().show()
# pred_4.filter(pred_4.prediction == float('nan')).count.show()

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse_4 = eval_rmse.evaluate(pred_4)
r2_4 = eval_r2.evaluate(pred_4)
print(f"RMSE:{rmse_4}")
print(f"R square:{r2_4}")


# eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
# eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

# rmse_4 = eval_rmse.evaluate(pred_test_4)
# r2_4 = eval_r2.evaluate(pred_test_4)
# print(f"RMSE:{rmse_4}")
# print(f"R square:{r2_4}")


###### df_random_cut

In [None]:
df_random_cut = spark.read.csv("file:///home/yutinglin/ml-25m/ratings_new.csv", inferSchema=True, header=True)

# df_ratings.show()
df_random_cut.describe().show()

In [None]:
from pyspark.ml.recommendation import ALS, ALSModel

model = ALSModel.load("file:///home/yutinglin/ml-25m/ALS_0719")

In [None]:
(train, test) = df_random_cut.randomSplit([0.7,0.3],seed=42)
pred = model.transform(test)

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse_4 = eval_rmse.evaluate(pred)
r2_4 = eval_r2.evaluate(pred)
print(f"RMSE:{rmse_4}")
print(f"R square:{r2_4}")

In [None]:
pred.describe().show()

In [None]:
user_5 = test.filter(test['userId'] == 5 ).select(['movieId','userId'])
# user_54493.show() #8
# user_2.show()
# user_3.show()
# user_1.show() #30

user_5.show()


rec = model.transform(user_5) 
rec.describe().show()
# rec.orderBy("Prediction",ascending=False).show() 
# rec.join(df_movies, rec.movieId == df_movies.movieId, 'inner')\
# .select(rec["movieId"], rec["userId"], rec["prediction"], df_movies["title"],df_movies["genres"])\
# .distinct()\
# .orderBy(rec["prediction"],ascending=False)\
# .show()

In [None]:
(train, test) = df_HF_users.randomSplit([0.8,0.2],seed=42)


als_4= ALS(rank=45,\
           maxIter=5,\
         regParam=0.06,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="rating",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_4 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_4 = als_4.fit(train)

In [None]:


pred_4 = model_4.transform(test)

pred_4.show()

pred_test_4 = pred_4.filter(pred_4.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse_4 = eval_rmse.evaluate(pred_test_4)
r2_4 = eval_r2.evaluate(pred_test_4)
print(f"RMSE:{rmse_4}")
print(f"R square:{r2_4}")


###### df_preference

In [18]:
df_preference=spark.read.parquet('file:///home/yutinglin/ml-25m/df_preference.csv') 

                                                                                

In [19]:
df_preference.describe().show()

[Stage 1414:>                                                       (0 + 2) / 3]

+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|          rating_var|
+-------+-----------------+------------------+------------------+--------------------+
|  count|         13991750|          13991750|          13991750|            13991750|
|   mean|81123.15314345954|19100.233544838924|3.5519471116908177| 0.15357257778576625|
| stddev|46871.26435112156| 37084.39010315628|0.9943638843782955|0.033202895399596456|
|    min|                1|                 1|               0.5|                 0.1|
|    max|           162537|            209171|               5.0|          0.21999626|
+-------+-----------------+------------------+------------------+--------------------+



                                                                                

In [20]:
(train, test) = df_preference.randomSplit([0.8,0.2],seed=42)


als_5= ALS(rank=30,\
           maxIter=20,\
           regParam=0.03,\
           userCol="userId",\
           itemCol="movieId",\
           ratingCol="rating",\
           nonnegative = True,\
           coldStartStrategy="drop",\
           implicitPrefs = False)

# als_5= ALS(maxIter=5,\
#          regParam=0.01,\
#          userCol="userId",\
#          itemCol="movieId",\
#          ratingCol="rating",\
#          nonnegative = True,\
#          coldStartStrategy="drop",\
#          implicitPrefs = False)

# als_5 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_5 = als_5.fit(train)

                                                                                

In [21]:
pred_5 = model_5.transform(test)

pred_5.show()

# pred_test_5 = pred_5.filter(pred_5.prediction != float('nan'))

[Stage 1560:>               (0 + 2) / 2][Stage 1562:>               (0 + 0) / 2]

+------+-------+------+-----------+----------+
|userId|movieId|rating| rating_var|prediction|
+------+-------+------+-----------+----------+
|    31|   1580|   3.0| 0.10436722| 2.3142946|
|    41|   1580|   4.0| 0.13040838|  3.856693|
|    57|   1580|   3.0| 0.16570607| 3.4143286|
|    91|   8638|   3.0|  0.1430564|  3.488011|
|    91|  96488|   2.0|  0.1430564|  4.266459|
|    93|  44022|   4.0| 0.20356104| 3.2890494|
|   132|   1238|   5.0| 0.18421452| 3.3713605|
|   132|   1959|   4.0| 0.18421452|  3.356423|
|   164|   8638|   4.0|   0.125008|  4.562225|
|   177|   1580|   3.0|  0.1530282| 3.8028038|
|   196|   6620|   5.0| 0.11311774|  4.093434|
|   202|   1580|   3.5|  0.1576263| 2.6592438|
|   207|   8638|   3.0| 0.10863083| 3.3166416|
|   262|    471|   4.0| 0.16335155| 2.6916325|
|   285|    471|   4.0| 0.18446364| 2.6473346|
|   316|   1580|   4.5|0.114998415| 4.3148727|
|   318|    471|   4.0| 0.11349618| 3.9058995|
|   322|   1645|   4.0| 0.13772674|  3.766097|
|   359|   31

                                                                                

In [None]:
user_1 = test.filter(test['userId'] == 1 ).select(['movieId','userId'])
# user_54493.show() #8
# user_2.show()
# user_3.show()
user_1.show() #30
#user_5.show() #23

# user_5.show()


rec = model_5.transform(user_1) 
rec.describe().show()

rec.join(df_movies, rec.movieId == df_movies.movieId, 'inner')\
.select(rec["movieId"], rec["userId"], rec["prediction"], df_movies["title"],df_movies["genres"])\
.distinct()\
.orderBy(rec["prediction"],ascending=False)\
.show()

In [22]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")

rmse_5 = eval_rmse.evaluate(pred_5)
r2_5 = eval_r2.evaluate(pred_5)
print(f"RMSE:{rmse_5}")
print(f"R square:{r2_5}")




RMSE:0.744103950046645
R square:0.4390072936685451


                                                                                

In [None]:
pred_5.describe().show()

In [None]:
user_recs = model_5.recommendForAllUsers(10)
# user_recs.describe().show()
user_recs.show(truncate=False)
user_recs.printSchema()
#get all user recommendation df
df_user_recs = user_recs.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs
df_user_recs.to_csv("df_user_recs.csv")

In [None]:
user_recs.printSchema()

In [None]:
df_user_recs_1 = user_recs.where(user_recs.userId == userId).select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs_1

In [None]:
#get all user recommendation df
df_user_recs = user_recs.select("userId","recommendations.movieId", "recommendations.rating").toPandas()
df_user_recs

In [None]:
df_user_recs.to_csv("df_user_recs.csv")

In [None]:

#     user_recs.where(user_recs.userId == userId).select("recommendations.movieId", "recommendations.rating").collect()

# df_user_recs = user_recs.where(user_recs.userId == userId).select("userId","recommendations.movieId", "recommendations.rating").toPandas()

def users_recommendation(userId):
    recs = df_user_recs[df_user_recs['userId']==userId]
    movieid = recs['movieId'][1]
    rating = recs['rating'][1]
    rec_user = pd.DataFrame(movieid, columns = ["movieId"])
    rec_user["rating"] = rating
    return rec_user


user5 = users_recommendation(5)    
user5    
# df_user_recs.head()

# userId
# for i in user_recs.userId:
#     user1 = user_recs.where(user_recs.userId == i).select("recommendations.movieId", "recommendations.rating").toPandas()
#     user1.head()

In [None]:
# rec_list = range(11)
# movieId=[]
# movieId.append(user1['movieId'][0])
# for i in range(10):
# type(user1['movieId'][0])
# print(movieId)


movieid = pd.Series(df_user_recs['movieId'][0])
rating = pd.Series(df_user_recs['rating'][0])

rec_matrix = pd.DataFrame(movieid, columns = ["movieId"])
rec_matrix["rating"] = rating

rec_matrix
# movieid
# rating
#     movie_rec = user1['movieId'][0][i]
#first rec movieId 
# user1['movieId'][0]
#first rec movie rating
# user1['rating'][0][0]

In [None]:
df_user_recs.describe()
# type(userId)

In [None]:

# import pandas as pd


# def get_recs_for_users(recs):
#     recs = recs.select("userId","recommendations.movieId","recommendations.rating")
#     users = recs.select("userId").toPandas()
#     movies = recs.select("movieId").toPandas()
#     ratings = recs.select("rating").toPandas()
#     ratings_matrix = pd.DataFrame(users, columns = ["userId"])
#     ratings_matrix["movieId"] = movies
#     ratings_matrix["ratings"] = ratings
#     return ratings_matrix

# ratings_matrix = get_recs_for_users(user_recs)

# ratings_matrix.head()
# user_recs


In [None]:
# def get_recs_for_users(recs):
#     recs = recs.select("recommendations.movieId","recommendations.rating")
#     movies = recs.select("movieId").toPandas().iloc[0,0]
#     ratings = recs.select("rating").toPandas().iloc[0,0]
#     ratings_matrix = pd.DataFrame(movies, columns = ["movieId"])
#     ratings_matrix["ratings"] = ratings
#     ratings_matrix_ps = spark.createDataFrame(ratings_matrix)
#     return ratings_matrix_ps

###### df_pre

In [None]:
(train, test) = df_pre.randomSplit([0.7,0.3],seed=42)

als_6= ALS(maxIter=5,\
         regParam=0.01,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="prefer",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

model_6 = als_6.fit(train)

In [None]:
pred_6 = model_6.transform(test)

pred_6.show()

pred_test_6 = pred_6.filter(pred_6.prediction != float('nan'))

In [None]:
# pred_6.show(100)

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="prefer", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="prefer", predictionCol="prediction")

rmse_6 = eval_rmse.evaluate(pred_test_6)
r2_6 = eval_r2.evaluate(pred_test_6)
print(f"RMSE:{rmse_6}")
print(f"R square:{r2_6}")

###### df_after2015_pre

In [None]:
df_after2015_pre=spark.read.parquet('file:///home/yutinglin/ml-25m/df_after2015_pre.csv') 

In [None]:
df_after2015_pre.describe().show()

In [None]:
(train, test) = df_after2015_pre.randomSplit([0.7,0.3],seed=42)

als_7= ALS(maxIter=5,\
         regParam=0.01,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="prefer",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_7 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_7 = als_7.fit(train)

In [None]:
pred_7 = model_7.transform(test)

pred_7.show()

pred_test_7 = pred_7.filter(pred_7.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="prefer", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="prefer", predictionCol="prediction")

rmse_7 = eval_rmse.evaluate(pred_test_7)
r2_7 = eval_r2.evaluate(pred_test_7)
print(f"RMSE:{rmse_7}")
print(f"R square:{r2_7}")

###### df_HF_movies_pre

In [None]:
df_HF_movies_pre=spark.read.parquet('file:///home/yutinglin/ml-25m/df_HF_movies_pre.csv') 

In [None]:
df_HF_movies_pre.describe().show()

In [None]:
(train, test) = df_HF_movies_pre.randomSplit([0.7,0.3],seed=42)

als_8= ALS(maxIter=5,\
         regParam=0.01,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="prefer",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_8 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_8 = als_8.fit(train)

In [None]:
pred_8 = model_8.transform(test)

pred_8.show()

pred_test_8 = pred_8.filter(pred_8.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="prefer", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="prefer", predictionCol="prediction")

rmse_8 = eval_rmse.evaluate(pred_test_8)
r2_8 = eval_r2.evaluate(pred_test_8)
print(f"RMSE:{rmse_8}")
print(f"R square:{r2_8}")

###### df_HF_users_pre

In [None]:
df_HF_users_pre=spark.read.parquet('file:///home/yutinglin/ml-25m/df_HF_users_pre.csv') 

In [None]:
df_HF_users_pre.describe().show()

In [None]:
(train, test) = df_HF_users_pre.randomSplit([0.7,0.3],seed=42)

als_9= ALS(maxIter=5,\
         regParam=0.01,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="prefer",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_9 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_9 = als_9.fit(train)

In [None]:
pred_9 = model_9.transform(test)

pred_9.show()

pred_test_9 = pred_9.filter(pred_9.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="prefer", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="prefer", predictionCol="prediction")

rmse_9 = eval_rmse.evaluate(pred_test_9)
r2_9 = eval_r2.evaluate(pred_test_9)
print(f"RMSE:{rmse_9}")
print(f"R square:{r2_9}")

###### df_preference_pre

In [None]:
df_preference_pre=spark.read.parquet('file:///home/yutinglin/ml-25m/df_preference_pre.csv') 

In [None]:
df_preference_pre.describe().show()

In [None]:
(train, test) = df_preference_pre.randomSplit([0.7,0.3],seed=42)

als_10= ALS(maxIter=5,\
         regParam=0.01,\
         userCol="userId",\
         itemCol="movieId",\
         ratingCol="prefer",\
         nonnegative = True,\
         coldStartStrategy="drop",\
         implicitPrefs = False)

# als_10 =ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True)

model_10 = als_10.fit(train)

In [None]:
pred_10 = model_10.transform(test)

pred_10.show()

pred_test_10 = pred_10.filter(pred_10.prediction != float('nan'))

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="prefer", predictionCol="prediction")
eval_r2 = RegressionEvaluator(metricName="r2", labelCol="prefer", predictionCol="prediction")

rmse_10 = eval_rmse.evaluate(pred_test_10)
r2_10 = eval_r2.evaluate(pred_test_10)
print(f"RMSE:{rmse_10}")
print(f"R square:{r2_10}")

In [None]:
#only rec 30 moives for a specific user
rec.orderBy("Prediction",ascending=False).describe().show()  

In [None]:
df_ratings.join(df_movies, df_ratings.movieId == df_movies.movieId, 'inner')\
.filter(df_ratings["userId"]==1).sort(df_ratings["rating"].desc()).select(df_ratings["movieId"], df_ratings["rating"], df_movies["title"])\
.distinct().show(truncate=False)

In [None]:

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]


In [None]:
df = spark.createDataFrame(data=data, schema = columns)
df.withColumn("Country", lit("USA")).show()
df.withColumn("Country", lit("USA")) \
  .withColumn("anotherColumn",lit("anotherValue")) \
  .show()


##### Model training(tunning)

###### df_ratings

In [None]:
df_ratings = spark.read.csv("hdfs://bdse85.example.com/tmp/ratings.csv", inferSchema=True, header=True)
df_movies = spark.read.csv("hdfs://bdse85.example.com/tmp/movies.csv", inferSchema=True, header=True)

In [None]:
#split data
(training , test) = df_ratings.randomSplit([0.8,0.2],seed=42)

als =ALS(userCol="userId", \
itemCol="movieId", \
ratingCol="rating",\
coldStartStrategy="drop", \
nonnegative = True, \
implicitPrefs = False)

#Tune model 
param_grid = ParamGridBuilder()\
             .addGrid(als.rank,[10,15,20])\
             .addGrid(als.maxIter, [5,10,15])\
             .addGrid(als.regParam, [.10,.15,.17])\
             .build()

evals_rmse = RegressionEvaluator(metricName="rmse", \
labelCol="rating", \
predictionCol="prediction")

evals_r2 = RegressionEvaluator(metricName="r2", \
labelCol="rating", \
predictionCol="prediction")


In [None]:
cv_rmse = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evals_rmse,
numFolds = 10)

cv_r2 = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evals_r2,
numFolds = 10)
# cv2 = TrainValidationSplit(
#     estimator=als,
#     estimatorParamMaps=param_grid,
#     evaluator=evals)

#fit ALS model to training data
model_rmse = cv_rmse.fit(training)

model_r2 = cv_r2.fit(training)


In [None]:
#extract best model from tuning
best_model_rmse = model_rmse.bestModel
best_model_r2 = model_r2.bestModel

predictions_rmse = best_model_rmse.transform(test)
predictions_r2 = best_model_r2.transform(test)

rmse = evals_rmse.evaluate(predictions_rmse)
r2 = evals_r2.evaluate(predictions_r2)



print(f"RMSE:{rmse}")
print(f"R square:{r2}")

print("**Best Model_RMSE**")
print(" Rank:", best_model_rmse.rank)
print(" MaxIter:", best_model_rmse._java_obj.parent().getMaxIter())
print(" RegParam:", best_model_rmse._java_obj.parent().getRegParam() )

print("**Best Model_R square**")
print(" Rank:", best_model_r2.rank)
print(" MaxIter:", best_model_r2._java_obj.parent().getMaxIter())
print(" RegParam:", best_model_r2._java_obj.parent().getRegParam() )


###### df_pre

In [None]:
df_pre =spark.read.parquet("hdfs://bdse50.example.com/tmp/df_pre.csv", inferSchema=True, header=True)
df_movies = spark.read.csv("hdfs://bdse50.example.com/tmp/movies.csv", inferSchema=True, header=True)

In [None]:
df_pre.describe().show()

In [None]:
#split data
(training , test) = df_pre.randomSplit([0.8,0.2],seed=42)

als =ALS(userCol="userId", \
itemCol="movieId", \
ratingCol="prefer",\
coldStartStrategy="drop", \
nonnegative = True, \
implicitPrefs = False)

#Tune model 
param_grid = ParamGridBuilder()\
             .addGrid(als.rank,[10,15,20])\
             .addGrid(als.maxIter, [5,10,15])\
             .addGrid(als.regParam, [.10,.15,.17])\
             .build()

evals_rmse = RegressionEvaluator(metricName="rmse", \
labelCol="prefer", \
predictionCol="prediction")

evals_r2 = RegressionEvaluator(metricName="r2", \
labelCol="prefer", \
predictionCol="prediction")


In [None]:
cv_rmse = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evals_rmse,
numFolds = 10)

cv_r2 = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evals_r2,
numFolds = 10)
# cv2 = TrainValidationSplit(
#     estimator=als,
#     estimatorParamMaps=param_grid,
#     evaluator=evals)

#fit ALS model to training data
model_rmse = cv_rmse.fit(training)

model_r2 = cv_r2.fit(training)


In [None]:
#extract best model from tuning
best_model_rmse = model_rmse.bestModel
best_model_r2 = model_r2.bestModel

predictions_rmse = best_model_rmse.transform(test)
predictions_r2 = best_model_r2.transform(test)

rmse = evals_rmse.evaluate(predictions_rmse)
r2 = evals_r2.evaluate(predictions_r2)



print(f"RMSE:{rmse}")
print(f"R square:{r2}")

print("**Best Model_RMSE**")
print(" Rank:", best_model_rmse.rank)
print(" MaxIter:", best_model_rmse._java_obj.parent().getMaxIter())
print(" RegParam:", best_model_rmse._java_obj.parent().getRegParam() )

print("**Best Model_R square**")
print(" Rank:", best_model_r2.rank)
print(" MaxIter:", best_model_r2._java_obj.parent().getMaxIter())
print(" RegParam:", best_model_r2._java_obj.parent().getRegParam() )


###### df_HF_users

In [None]:
df_HF_users =spark.read.parquet("hdfs://bdse50.example.com/tmp/df_HF_users.csv", inferSchema=True, header=True)
df_movies = spark.read.csv("hdfs://bdse50.example.com/tmp/movies.csv", inferSchema=True, header=True)

In [None]:
df_HF_users.describe().show()

In [None]:
#split data
(training , test) = df_HF_users.randomSplit([0.8,0.2],seed=42)

als =ALS(userCol="userId", \
itemCol="movieId", \
ratingCol="rating",\
coldStartStrategy="drop", \
nonnegative = True, \
implicitPrefs = False)

#Tune model 
param_grid = ParamGridBuilder()\
             .addGrid(als.rank,[15,20,25])\
             .addGrid(als.maxIter, [10,15,20])\
             .addGrid(als.regParam, [.06,.08,.10])\
             .build()

evals_rmse = RegressionEvaluator(metricName="rmse", \
labelCol="rating", \
predictionCol="prediction")

evals_r2 = RegressionEvaluator(metricName="r2", \
labelCol="rating", \
predictionCol="prediction")


In [None]:
cv_rmse = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evals_rmse,
numFolds = 10)

cv_r2 = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evals_r2,
numFolds = 10)
# cv2 = TrainValidationSplit(
#     estimator=als,
#     estimatorParamMaps=param_grid,
#     evaluator=evals)

#fit ALS model to training data
model_rmse = cv_rmse.fit(training)

model_r2 = cv_r2.fit(training)


In [None]:
#extract best model from tuning
best_model_rmse = model_rmse.bestModel
best_model_r2 = model_r2.bestModel

predictions_rmse = best_model_rmse.transform(test)
predictions_r2 = best_model_r2.transform(test)

rmse = evals_rmse.evaluate(predictions_rmse)
r2 = evals_r2.evaluate(predictions_r2)



print(f"RMSE:{rmse}")
print(f"R square:{r2}")

print("**Best Model_RMSE**")
print(" Rank:", best_model_rmse.rank)
print(" MaxIter:", best_model_rmse._java_obj.parent().getMaxIter())
print(" RegParam:", best_model_rmse._java_obj.parent().getRegParam() )

print("**Best Model_R square**")
print(" Rank:", best_model_r2.rank)
print(" MaxIter:", best_model_r2._java_obj.parent().getMaxIter())
print(" RegParam:", best_model_r2._java_obj.parent().getRegParam() )


In [None]:
model.write().save("file:///home/yutinglin/ml-25m/ALS_0719")

In [None]:
model = ALSModel.load("????path???ALS_0719")

In [None]:
predictions1 = best_model1.transform(test)
rmse1 = evals.evaluate(predictions1)

print(f"RMSE:{rmse1}")
print("**Best Model**")
print(" Rank:"), best_model1.rank
print(" MaxIter:"), best_model1._java_obj.parent().getMaxIter()
print(" RegParam:"), best_model1._java_obj.parent().getRegParam() 

In [None]:

r2 = evals.evaluate(predictions1)

print(f"R square:{r2}")
print("**Best Model**")
print(" Rank:", best_model1.rank)
print(" MaxIter:", best_model1._java_obj.parent().getMaxIter())
print(" RegParam:", best_model1._java_obj.parent().getRegParam() )

In [None]:
 best_model1._java_obj.parent().getMaxIter()

In [None]:

print(f"RMSE:{rmse1}")
print("**Best Model**")
print(" Rank:", best_model1.rank)
print(" MaxIter:", best_model1._java_obj.parent().getMaxIter())
print(" RegParam:", best_model1._java_obj.parent().getRegParam() )