In [1]:
import os
import pyspark
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
datasets_path = os.path.join('..','movie_recommendation_system', 'datasets')
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
tags_file = os.path.join(datasets_path, 'ml-latest', 'tags.csv')
links_file = os.path.join(datasets_path, 'ml-latest','links.csv')
movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')

In [2]:
spark = SparkSession.builder.getOrCreate()
"""
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "50g") \
    .config("spark.submit.deployMode", "local") \
    .config("spark.executor.memory", "200g") \
    .config('spark.driver.maxResultSize', '200G') \
    .config('spark.executor.cores', '5')\
    .config('spark.memory.fraction ', '0')\
    .config('spark.memory.offHeap.enabled', 'true')\
    .config('spark.memory.offHeap.size', '2g')\
    .appName('movie-recomendation') \
    .getOrCreate()
"""

'\nspark = SparkSession.builder     .master(\'local[*]\')     .config("spark.driver.memory", "50g")     .config("spark.submit.deployMode", "local")     .config("spark.executor.memory", "200g")     .config(\'spark.driver.maxResultSize\', \'200G\')     .config(\'spark.executor.cores\', \'5\')    .config(\'spark.memory.fraction \', \'0\')    .config(\'spark.memory.offHeap.enabled\', \'true\')    .config(\'spark.memory.offHeap.size\', \'2g\')    .appName(\'movie-recomendation\')     .getOrCreate()\n'

In [3]:
sc.getConf().getAll()

[('spark.driver.host', '192.168.1.73'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.driver.port', '40381'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1617849580041'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.ui.showConsoleProgress', 'true')]

In [4]:
#
conf1 = pyspark.SparkConf().setAll([('spark.executor.memory', '10p'), ('spark.driver.maxResultSize', '10p'),('spark.executor.cores', '10'), 
                                    ('spark.cores.max', '10'), ('spark.driver.memory','50g'), 
                                    ('spark.memory.fraction', '0.01'), ('spark.submit.deployMode', 'cluster')])
sc.stop()
sc = pyspark.SparkContext(conf=conf1)
spark = SparkSession.builder.config(conf=conf1).getOrCreate()
sc.getConf().getAll()

[('spark.driver.memory', '50g'),
 ('spark.driver.host', '192.168.1.73'),
 ('spark.submit.deployMode', 'cluster'),
 ('spark.executor.id', 'driver'),
 ('spark.executor.memory', '10p'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.executor.cores', '10'),
 ('spark.memory.fraction', '0.01'),
 ('spark.cores.max', '10'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.driver.port', '34273'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.maxResultSize', '10p'),
 ('spark.app.id', 'local-1617849581901')]

In [5]:
#Schema design which are as same as csv
ratingschema = StructType()\
    .add("userId", IntegerType(), True)\
    .add("movieId", IntegerType(), True)\
    .add("rating", FloatType(), True)\
    .add("timeStamp", IntegerType(), True)
tagsschema = StructType()\
    .add("userId", IntegerType(), True)\
    .add("movieId", IntegerType(), True)\
    .add("tag", StringType(), True)\
    .add("timeStamp", IntegerType(), True)
linkschema = StructType()\
    .add("movieId", IntegerType(), True)\
    .add("IMDBID", IntegerType(), True)\
    .add("TMDBID", IntegerType(), True)

In [6]:
ratingdf = spark.read.format("csv")\
    .option("header",True)\
    .schema(ratingschema)\
    .load(complete_ratings_file)
tagdf = spark.read.format("csv")\
    .option("header",True)\
    .schema(tagsschema)\
    .load(tags_file)
linkdf = spark.read.format("csv")\
    .option("header",True)\
    .schema(linkschema)\
    .load(links_file)

In [7]:
ratingdf.show(), tagdf, linkdf

+------+-------+------+----------+
|userId|movieId|rating| timeStamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
|     1|   1590|   2.5|1256677236|
|     1|   1591|   1.5|1256677475|
|     1|   2134|   4.5|1256677464|
|     1|   2478|   4.0|1256677239|
|     1|   2840|   3.0|1256677500|
|     1|   2986|   2.5|1256677496|
|     1|   3020|   4.0|1256677260|
|     1|   3424|   4.5|1256677444|
|     1|   3698|   3.5|1256677243|
|     1|   3826|   2.0|1256677210|
|     1|   3893|   3.5|1256677486|
|     2|    170|   3.5|1192913581|
|     2|    849|   3.5|1192913537|
|     2|   1186|   3.5|1192913611|
|     2|   1235|   3.0|1192913585|
+------+-------+------+----------+
only showing top 20 rows



(None,
 DataFrame[userId: int, movieId: int, tag: string, timeStamp: int],
 DataFrame[movieId: int, IMDBID: int, TMDBID: int])

In [8]:
movie = spark.read.option("inferSchema", "true")\
    .option("header", "true").csv(movies_file)
movie

DataFrame[movieId: int, title: string, genres: string]

In [9]:
moviedf = movie.drop("genres")
genredf = movie.drop("title")
userdf = ratingdf.drop("movieId", "rating", "timeStamp")
userdf = userdf.select('userId').distinct()

In [10]:
moviedf, genredf, userdf

(DataFrame[movieId: int, title: string],
 DataFrame[movieId: int, genres: string],
 DataFrame[userId: int])

we need to connect Redis here

# ALS: collaborative filtering for explicit data

In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [12]:
ratingforals = ratingdf.drop("timeStamp")
ratingforals.show()
#spark.driver.memoryOverhead
#spark.executor.memoryOverhead

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
|     1|   1257|   4.5|
|     1|   1449|   4.5|
|     1|   1590|   2.5|
|     1|   1591|   1.5|
|     1|   2134|   4.5|
|     1|   2478|   4.0|
|     1|   2840|   3.0|
|     1|   2986|   2.5|
|     1|   3020|   4.0|
|     1|   3424|   4.5|
|     1|   3698|   3.5|
|     1|   3826|   2.0|
|     1|   3893|   3.5|
|     2|    170|   3.5|
|     2|    849|   3.5|
|     2|   1186|   3.5|
|     2|   1235|   3.0|
+------+-------+------+
only showing top 20 rows



In [14]:
training = ratingforals.limit(10)
training.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
|     1|   1257|   4.5|
|     1|   1449|   4.5|
|     1|   1590|   2.5|
|     1|   1591|   1.5|
|     1|   2134|   4.5|
|     1|   2478|   4.0|
|     1|   2840|   3.0|
+------+-------+------+



In [15]:

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
# c
als = ALS(maxIter=3, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", 
          coldStartStrategy="drop")
model = als.fit(training)



In [16]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [27]:
users = training.select(als.getUserCol()).distinct().limit(3)
users.show()

+------+
|userId|
+------+
|     1|
+------+



In [28]:
userSubsetRecs = model.recommendForUserSubset(users, 10)
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[[2134, 4.4960732...|
+------+--------------------+



error message: 
1. when executor = 50g, 200g: Job aborted due to stage failure: Task 1 in stage 7.0 failed 1 times, most recent failure: Lost task 1.0 in stage 7.0 (TID 22, 192.168.1.73, executor driver): java.lang.OutOfMemoryError: Java heap space
2. additional maxreusltsize = 20G: Py4JJavaError: An error occurred while calling o145.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 7.0 failed 1 times, most recent failure: Lost task 6.0 in stage 7.0 (TID 27, 192.16An error occurred while calling o381.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 21, 192.168.1.73, executor driver): java.lang.OutOfMemoryError: Java heap space8.1.73, executor driver): java.lang.OutOfMemoryError: Java heap space\
maybe this:
https://knowledge.informatica.com/s/article/567632?language=en_US\
An error occurred while calling o381.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 21, 192.168.1.73, executor driver): java.lang.OutOfMemoryError: Java heap space

https://spark.apache.org/docs/2.3.1/ml-collaborative-filtering.html