# Big Data Final Assignment #
##### CSC-8101 #####
#### Randy J. Rodriguez Collado ####

## Task 1 ##

In [None]:
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import re
from pyspark.sql.functions import udf

NETFLIX_MOVIE_PATH="../../data/netflix_movie_titles.txt"
MOVIE_CANOCICAL_PATH="../../data/movie_titles_canonical.txt"

In [99]:
netflix_movie_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("year", IntegerType(), False),
    StructField("name", StringType(), False)])

netflix_movie_df = spark.read.csv(NETFLIX_MOVIE_PATH,header=False,schema=netflix_movie_schema,mode='DROPMALFORMED', multiLine=True)
netflix_movie_df

DataFrame[id: int, year: int, name: string]

In [100]:
canonical_schema = StructType([
    StructField("name", StringType(), False),
    StructField("year", IntegerType(), False)])

canonical_df = spark.read.csv(MOVIE_CANOCICAL_PATH,header=False,schema=canonical_schema,mode='DROPMALFORMED', multiLine=True)#.rdd.map(lambda i: (i[1],i[0]))
canonical_df

DataFrame[name: string, year: int]

In [101]:
netflix_movie_df = netflix_movie_df.orderBy(netflix_movie_df.year)
netflix_movie_df.show(10)

+-----+----+--------------------+
|   id|year|                name|
+-----+----+--------------------+
| 7654|1896|Lumiere Brothers'...|
| 4975|1909|D.W. Griffith: Ye...|
| 9103|1914|Tillie's Puncture...|
|10898|1914|             Cabiria|
| 8821|1915|The Birth of a Na...|
|14687|1915|Chaplin's Essanay...|
| 3137|1915|        Les Vampires|
| 9001|1915|Chaplin's Essanay...|
|13147|1915|Chaplin's Essanay...|
| 3387|1916|         Intolerance|
+-----+----+--------------------+
only showing top 10 rows



In [102]:
max_netflix_year = netflix_movie_df.agg({"year": "max"}).collect()[0][0]
min_netflix_year = netflix_movie_df.agg({"year": "min"}).collect()[0][0]

In [103]:
canonical_df = canonical_df.filter(canonical_df.year >= min_netflix_year).filter(canonical_df.year <= max_netflix_year).orderBy(canonical_df.year.desc())

In [104]:
@udf
def clean_title(string):
    return re.sub('[^A-Za-z0-9 ]+', '', string).lower()

In [105]:
netflix_movie_df = netflix_movie_df.withColumn("title", clean_title("name")).drop("name")
netflix_movie_df.show(10)

+-----+----+--------------------+
|   id|year|               title|
+-----+----+--------------------+
| 7654|1896|lumiere brothers ...|
| 4975|1909|dw griffith years...|
| 9103|1914|tillies punctured...|
|10898|1914|             cabiria|
| 8821|1915|the birth of a na...|
|14687|1915|chaplins essanay ...|
| 3137|1915|        les vampires|
| 9001|1915|chaplins essanay ...|
|13147|1915|chaplins essanay ...|
| 3387|1916|         intolerance|
+-----+----+--------------------+
only showing top 10 rows



In [106]:
canonical_df = canonical_df.withColumn("title", clean_title("name"))
canonical_df.show(10)

+--------------------+----+--------------------+
|                name|year|               title|
+--------------------+----+--------------------+
|   War of the Worlds|2005|   war of the worlds|
|       Batman Begins|2005|       batman begins|
|         Match Point|2005|         match point|
|Charlie and the C...|2005|charlie and the c...|
|  Brokeback Mountain|2005|  brokeback mountain|
|             Jarhead|2005|             jarhead|
|The Secret Life o...|2005|the secret life o...|
|A History of Viol...|2005|a history of viol...|
|     The Interpreter|2005|     the interpreter|
|       Walk the Line|2005|       walk the line|
+--------------------+----+--------------------+
only showing top 10 rows



In [107]:
joined_df = netflix_movie_df.join(canonical_df, (netflix_movie_df.year == canonical_df.year) & (netflix_movie_df.title == canonical_df.title))
joined_df = joined_df.drop("title").drop("year")
joined_df.show(10)

+-----+--------------------+
|   id|                name|
+-----+--------------------+
|10898|             Cabiria|
| 8821|The Birth of a Na...|
| 3387|         Intolerance|
| 9898|Dr. Jekyll and Mr...|
| 9898|Dr. Jekyll and Mr...|
|11825|             The Kid|
|11318| Nanook of the North|
| 3554|      The Last Laugh|
| 4249| Battleship Potemkin|
| 5963|       The Gold Rush|
+-----+--------------------+
only showing top 10 rows



In [108]:
joined_df.count()

3648

## Task 2 ##

In [2]:
from pyspark.ml.recommendation import ALS 
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import SQLContext

SAMPLE_PARQUET_PATH="../../data/mv_sampled.parquet"

In [3]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [4]:
sample_data_df = sqlContext.read.parquet(SAMPLE_PARQUET_PATH).drop('timestamp')
sample_data_df

DataFrame[movieId: bigint, rating: double, userId: bigint]

In [5]:
(train_df, test_df) = sample_data_df.randomSplit([0.8,0.2])
train_df

DataFrame[movieId: bigint, rating: double, userId: bigint]

In [6]:
als_param = ALS(rank=10,maxIter=5,regParam=0.01,userCol='userId',itemCol='movieId',ratingCol='rating',coldStartStrategy='drop')
als_model = als_param.fit(train_df)
als_model.rank

10

In [7]:
predictions = als_model.transform(test_df)
predictions.show(10)

+-------+------+-------+-----------+
|movieId|rating| userId| prediction|
+-------+------+-------+-----------+
|    148|   5.0|2528213|  0.8507967|
|    148|   5.0|1892759|-0.72667825|
|    148|   2.0|2614093|  -3.554331|
|    148|   2.0| 412221| -1.6509634|
|    148|   1.0|2589259| -4.5433054|
|    148|   5.0|1712386|  3.8130896|
|    148|   2.0|2157060|  0.5814957|
|    148|   3.0|1606487| -2.7101128|
|    148|   3.0| 146307| -2.2028742|
|    148|   2.0|2333531|0.101098344|
+-------+------+-------+-----------+
only showing top 10 rows



## Task 3 ##

In [119]:
#Question, is there a way to not recommend for all users?
all_users_recommendations = als_model.recommendForAllUsers(10)
all_users_recommendations.persist()
raw_recommendations = all_users_recommendations.filter(all_users_recommendations.userId == 30878).collect()[0]['recommendations']

In [120]:
recommendations_df = spark.createDataFrame(raw_recommendations)
recommendations_df.show(10)

+-------+------------------+
|movieId|            rating|
+-------+------------------+
|   8709|27.525110244750977|
|   5632|23.876440048217773|
|  11860|23.497241973876953|
|  12658| 23.28026580810547|
|  10439|22.855117797851562|
|   5831|22.717992782592773|
|   5139| 22.64234733581543|
|  11641|        21.9609375|
|   3838|21.103553771972656|
|  13258|20.970252990722656|
+-------+------------------+



In [121]:
user_recommendations_df = netflix_movie_df.join(recommendations_df, netflix_movie_df.id == recommendations_df.movieId).select(netflix_movie_df.id, netflix_movie_df.title, recommendations_df.rating)
user_recommendations_df.show(10)

+-----+--------------------+------------------+
|   id|               title|            rating|
+-----+--------------------+------------------+
| 8709|        mr wonderful|27.525110244750977|
| 5632|midsomer murders ...|23.876440048217773|
|11860|bruce lee fists o...|23.497241973876953|
|12658|the vicar of dibl...| 23.28026580810547|
|10439|          the circus|22.855117797851562|
| 5831|     bluehill avenue|22.717992782592773|
| 5139|a simple twist of...| 22.64234733581543|
|11641|    my socalled life|        21.9609375|
| 3838|cardcaptor sakura...|21.103553771972656|
|13258|                gozu|20.970252990722656|
+-----+--------------------+------------------+



In [122]:
user_recommendations_df.coalesce(1).rdd.map(tuple).saveAsTextFile('assignemnt_task3')

## Task 4 ##

In [87]:
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Square Error = %g" % rmse)

Root Mean Square Error = 6.24412


## Task 5 ##

In [88]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [89]:
als_rating_param = ALS(userCol='userId',itemCol='movieId',ratingCol='rating',coldStartStrategy='drop')
param_grid = ParamGridBuilder().addGrid(als_rating_param.rank, [15]).addGrid(als_rating_param.maxIter, [10]).addGrid(als_rating_param.regParam, [0.05,0.1]).build()
regression_eval = RegressionEvaluator()
regression_eval.setMetricName("rmse")
regression_eval.setPredictionCol("prediction")
regression_eval.setLabelCol("rating")

RegressionEvaluator_4150a2ee186c0baca84d

In [90]:
cross_validator = CrossValidator(estimator=als_rating_param, estimatorParamMaps=param_grid, evaluator=regression_eval, numFolds=2)
cv_model = cross_validator.fit(train_df)

In [91]:
cv_results_df = cv_model.transform(test_df)
cv_results_df.show(10)

+-------+------+-------+----------+
|movieId|rating| userId|prediction|
+-------+------+-------+----------+
|    148|   5.0|2528213|  1.738977|
|    148|   5.0|1892759| 0.6468946|
|    148|   2.0|2614093| 2.5850089|
|    148|   2.0| 412221| 1.9417876|
|    148|   1.0|2589259| 2.5654597|
|    148|   5.0|1712386| 2.4267342|
|    148|   2.0|2157060| 2.9371736|
|    148|   3.0|1606487| 1.7900494|
|    148|   3.0| 146307| 1.8086294|
|    148|   2.0|2333531|0.46258342|
+-------+------+-------+----------+
only showing top 10 rows



In [92]:
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(cv_results_df)
print("Root Mean Square Error = %g" % rmse)

Root Mean Square Error = 1.48549


## Task 6 ##

In [93]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
QUALIFYING_SIMPLE_PATH="../../data/qualifying_simple.txt"

In [94]:
qualifying_simple_schema = StructType([
    StructField("userId", IntegerType(), False),
    StructField("movieId", IntegerType(), False),
    StructField("date", StringType(), False)])

qualifying_simple_df = spark.read.csv(QUALIFYING_SIMPLE_PATH,header=False,schema=qualifying_simple_schema,mode='DROPMALFORMED', multiLine=True)
qualifying_simple_df.show(10)

+------+-------+----------+
|userId|movieId|      date|
+------+-------+----------+
|     1|1046323|2005-12-19|
|     1|1080030|2005-12-23|
|     1|1830096|2005-03-14|
|     1| 368059|2005-05-26|
|     1| 802003|2005-11-07|
|     1| 513509|2005-07-04|
|     1|1086137|2005-09-21|
|     1| 428698|2005-12-20|
|     1| 515850|2005-11-27|
|     1| 131974|2005-12-15|
+------+-------+----------+
only showing top 10 rows



In [95]:
qualifying_predictions =  als_model.transform(qualifying_simple_df)
qualifying_predictions.show(10)

+------+-------+----------+-----------+
|userId|movieId|      date| prediction|
+------+-------+----------+-----------+
| 10612|    471|2005-09-04|  -9.926776|
|  7745|   2122|2005-11-16|  1.9553128|
|  2612|   2659|2005-12-21|   -6.10647|
|  3725|  10817|2005-10-27|  16.000393|
|  1590|  14832|2005-09-19| -2.0345006|
|  6797|  15790|2005-08-23|-0.27778524|
| 14230|  16386|2005-12-12|-0.37058395|
|  1798|    623|2005-08-15|-0.29319063|
|  5862|   8803|2005-09-26| -1.4021175|
| 16201|  10230|2005-12-21|  -2.931856|
+------+-------+----------+-----------+
only showing top 10 rows



In [97]:
qualifying_predictions.rdd.map(tuple).saveAsTextFile('qualifying_predictions')

## Task 7 ##

In [83]:
comparison_user_df = cv_results_df.filter(cv_results_df.userId == 30878).filter(cv_results_df.prediction >= 3).orderBy(cv_results_df.movieId.asc()).drop('rating').drop('userId')
comparison_user_df.show(10)

+-------+----------+
|movieId|prediction|
+-------+----------+
|   2342| 1.9457794|
|   3579| 2.8052087|
|  10359| 3.1876295|
+-------+----------+



In [84]:
compare_users_df = cv_results_df.filter((cv_results_df.userId != 30878) & (cv_results_df.prediction >= 3)).orderBy(cv_results_df.movieId.asc()).drop('rating')
compare_users_df.show(10)

+-------+-------+----------+
|movieId| userId|prediction|
+-------+-------+----------+
|      1|1017324| 2.3924155|
|      3|1153447|  1.819693|
|      5|1170034|  2.019347|
|      5| 590466|  3.332409|
|      6| 381588| 2.8389723|
|      6|  52856|   2.39281|
|      8|1308307| 1.3842506|
|      8|2571885| 1.9592941|
|      8| 257806| 1.7223551|
|      8| 444417| 1.6315118|
+-------+-------+----------+
only showing top 10 rows



In [85]:
joined_df = compare_users_df.join(comparison_user_df, comparison_user_df.movieId == compare_users_df.movieId).select(compare_users_df.userId, compare_users_df.movieId)
joined_df = joined_df.groupBy(compare_users_df.userId).count().orderBy("count", ascending=False)
joined_df.show(10)

+-------+-----+
| userId|count|
+-------+-----+
| 540833|    2|
| 734490|    1|
| 784765|    1|
|1807031|    1|
|  34408|    1|
|1545960|    1|
|2042978|    1|
|2344564|    1|
|2606295|    1|
| 813730|    1|
+-------+-----+
only showing top 10 rows

