### Spark RDD

In [2]:
import os
from pyspark import SparkConf, SparkContext

In [3]:
def loadMovieNames():
    movieNames = {}
    with open(os.path.join('..', 'data', 'ml-100k', 'u.item')) as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            
    return movieNames

In [4]:
def parseInput(line):
    fields = line.split()
    # fields[1] : movieID, fields[2]: rating
    return (int(fields[1]), (float(fields[2]), 1.0))

In [1]:
if __name__ == "__main__":
    conf = SparkConf().setAppName("WorstMovies")
    sc = SparkContext(conf=conf)
    
    # load up our movie ID -> movie name lookup table
    movieNames = loadMovieNames()
    
    # load ut the raw u.data file
    lines = sc.textFile(os.path.join('..', 'data', 'ml-100k', 'u.data'))
    # lines = sc.textFile(os.path.join('hdfs://', 'user', 'maria_dev', 'ml-100k', 'u.item'))
    
    # convert to (movieID, (rating, 1.0))
    movieRatings = lines.map(parseInput)
    
    # reduce to (moiveID), (sumOfRatings, totalRatings)
    ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: (movie1[0] + movie2[0]))
    
    # map to (movieID, averageRating)
    averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
    
    # sort by average rating
    sortedMovies = averageRatings.sortBy(lambda x: x[1])
    
    # take the top 10 results
    results = sortedMovies.take(10)
    
    # print them out
    for result in results:
        print(movieNames[result[0]], result[1])

 ### Spark DataFrames

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [8]:
def loadMovieNames():
    movieNames = {}
    with open(os.path.join('..', 'data', 'ml-100k', 'u.item')) as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            
    return movieNames

In [9]:
def parseInput(line):
    fields = line.split()
    
    return Row(movieID=int(fields[1]), rating=float(fields[2]))

In [2]:
# create a SparkSession
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# load up our movie ID -> name dictionary
movieNames = loadMoiveNames()

# get the raw data
lines = spark.sparkContext.textFile(os.path.join('..', 'data', 'ml-100k', 'u.data'))
# lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")

# convert it to a RDD of Row objects with (movieID, rating)
movies = lines.map(parseInput)

# convert that to a DataFrame
movieDataset = spark.createDataFrame(movies)

# compute average rating for each movieID
averageRatings = movieDataset.groupBy("movieID").avg("rating")

# compute count of ratings for each movieID
counts = movieDataset.groupby("movieID").count()

# join the two dataset
averageAndCounts = counts.join(averageRatings, "movieID")

# pull the top 10 results
topTen = averageAndCounts.orderBy("avg(rating)").take(10)

# print result
for movie in topTen:
    print(movieNames[movie[0]], movie[1], movie[2])
    
# stop the session
spark.stop()

### Movie Recommendation with MLLib

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

In [12]:
# load up movieID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open(os.path.join('..', 'data', 'ml-100k', 'u.item')) as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            
    return movieNames

In [13]:
# convert u.data lines into (userID, movieID, rating) rows
def parseInput(line):
    fields = line.value.split()
    
    return Row(userID=int(fields[0]), movieID=int(fields[1]), rating=float(fields[2]))

In [None]:
# create a SparkSession
spark = SparkSession.builder.appName("movieRecs").getOrCreate()

# load up our movie ID -> name dictionary
movieNames = loadMoiveNames()

# get the raw data
lines = spark.sparkContext.textFile(os.path.join('..', 'data', 'ml-100k', 'u.data'))
# lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")

# convert it to a RDD of Row objects with (userID, movieID, rating)
ratingsRDD = lines.map(parseInput)

# convert to a DataFrame and cache it
ratings = spark.createDataFrame(ratingsRDD).cashe()

# creat an ALS collaborative filtering modle from the complete dataset
als = ALS(maxIter=5, regParma=0.01, userCol='userID', itemCol='movieID', ratingCol='rating')
model = als.fit(ratings)

# print out ratings from user 0:
print("\nRatings for user ID 0:")
userRatings = ratings.filter("userID = 0")
for rating in userRatings.collect():
    print(moiveNames[rating['movieID']], rating['rating'])
    
print("\nTop 20 recommendations:")
# find movies rated more than 100 times:
ratingCounts = ratings.groupby("movieID").count().filter("count > 100")

# construct a test dataframe for user 0 with every movie rated more than 100 times
popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))

# run model on that list of popular movies for user ID 0
recommendations = model.transform(popularMovies)

# get the top 20 movies with the highest predicted rating for this user
topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

for recommendation in topRecommendations:
    print(movieNames[recommendation['movieID']], recommendation['prediction'])
    
spark.stop()