## Data 612 Project 5
#### Group member : Wei Zhou / Mia Chen

In [2]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()



+-----+
|hello|
+-----+
|spark|
+-----+



In [65]:
# read ratings CSV
ratings_df = spark.read.csv('../project4/ml-100k/u.data',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep="\t",           # char for separation
                         inferSchema=True
                           )  # do we infer schema or not ?
ratings_df.withColumnRenamed("196","user_id")\
                      .withColumnRenamed("242","movie_id")\
                      .withColumnRenamed("3","rating")\
                      .withColumnRenamed("881250949","timestamp")\
                      .printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [51]:
ratings = ratings_df.rdd

print(ratings)

numRatings = ratings.count()
numUsers = ratings.map(lambda r: r[0]).distinct().count()
numMovies = ratings.map(lambda r: r[1]).distinct().count()

print("Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies))

MapPartitionsRDD[297] at javaToPython at NativeMethodAccessorImpl.java:0
Got 99999 ratings from 943 users on 1682 movies.


In [60]:
# read movies CSV
movies_df = spark.read.csv('../project4/ml-100k/u.item',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep="|",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

movies_df=movies_df.withColumnRenamed("10","movie_id")
movies_df.printSchema() 

root
 |-- movie_id: integer (nullable = true)
 |-- Toy Story (1995): string (nullable = true)
 |-- 01-Jan-1995: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- http://us.imdb.com/M/title-exact?Toy%20Story%20(1995): string (nullable = true)
 |-- 05: integer (nullable = true)
 |-- 06: integer (nullable = true)
 |-- 07: integer (nullable = true)
 |-- 18: integer (nullable = true)
 |-- 19: integer (nullable = true)
 |-- 110: integer (nullable = true)
 |-- 011: integer (nullable = true)
 |-- 012: integer (nullable = true)
 |-- 013: integer (nullable = true)
 |-- 014: integer (nullable = true)
 |-- 015: integer (nullable = true)
 |-- 016: integer (nullable = true)
 |-- 017: integer (nullable = true)
 |-- 018: integer (nullable = true)
 |-- 019: integer (nullable = true)
 |-- 020: integer (nullable = true)
 |-- 021: integer (nullable = true)
 |-- 022: integer (nullable = true)
 |-- 023: integer (nullable = true)



In [64]:
movies_counts = ratings_df.groupBy("242").count()
movies_counts.show()

+----+-----+
| 242|count|
+----+-----+
| 496|  231|
| 471|  221|
| 463|   71|
| 148|  128|
|1342|    2|
| 833|   49|
|1088|   13|
|1591|    6|
|1238|    8|
|1580|    1|
|1645|    1|
| 392|   68|
| 623|   39|
| 540|   43|
| 858|    3|
| 737|   59|
| 243|  132|
|1025|   44|
|1084|   21|
|1127|   11|
+----+-----+
only showing top 20 rows



In [67]:
training_df, validation_df, test_df = ratings_df.randomSplit([.6, .2, .2])
training_df

DataFrame[196: int, 242: int, 3: int, 881250949: int]

In [68]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import Row
import numpy as np
import math

In [69]:
#These are the diffrenet parameters which we can tune for the ALS model.
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = range(4, 12)
errors = []
err = 0
tolerance = 0.02

In [75]:
#####Let's see which rank value will be the most optimized one .
min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=rank, userCol="196", itemCol="242", ratingCol="3")
    model = als.fit(training_df)
    predictions = model.transform(validation_df)
    print(type(predictions))
    predictions.describe()
    print(predictions.head(5))
    new_predictions = predictions[predictions['prediction'] != np.NaN]
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="3", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    errors.append(rmse)

    print('For rank %s the RMSE is %s' % (rank, rmse))
    if rmse < min_error:
        min_error = rmse
        best_rank = rank
print('The best model was trained with rank %s' % best_rank)

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(196=606, 242=148, 3=3, 881250949=878150506, prediction=3.5392303466796875), Row(196=663, 242=148, 3=4, 881250949=889492989, prediction=3.1617956161499023), Row(196=222, 242=148, 3=2, 881250949=881061164, prediction=3.130164384841919), Row(196=330, 242=148, 3=4, 881250949=876544781, prediction=4.114264011383057), Row(196=224, 242=148, 3=3, 881250949=888104154, prediction=3.5527002811431885)]
For rank 4 the RMSE is 0.942490275965
<class 'pyspark.sql.dataframe.DataFrame'>
[Row(196=606, 242=148, 3=3, 881250949=878150506, prediction=3.5870258808135986), Row(196=663, 242=148, 3=4, 881250949=889492989, prediction=3.195436954498291), Row(196=222, 242=148, 3=2, 881250949=881061164, prediction=3.135669231414795), Row(196=330, 242=148, 3=4, 881250949=876544781, prediction=3.890799045562744), Row(196=224, 242=148, 3=3, 881250949=888104154, prediction=3.3917620182037354)]
For rank 5 the RMSE is 0.941244095705
<class 'pyspark.sql.dataframe.DataFrame'>
[

### Rank = 4 has the rmse = 0.942490275965