## USE SPARK

In [None]:
from pyspark import SparkConf, SparkContext

In [None]:
def loadMovieNames(data):
  movieNames = {}
  with open(data,encoding = 'latin-1') as f:
    for line in f:
      fields= line.split('|')
      movieNames[int(fields[0])] = fields[1]
  return movieNames
def parseInput(line):
  fields = line.split()
  return (int(fields[1]), (float(fields[2]),1.0))

In [None]:
data = sc.parallelize([("coffee", 1), ("coffee", 2), ("tea", 1), ("coffee", 3)])

# Define a reduction function to sum the values
def sum_values(x, y):
  return x + y

# Apply reduceByKey with the sum function
reduced_data = data.reduceByKey(sum_values)

# Print the result (might be shuffled due to partitioning)
reduced_data.foreach(print)

In [None]:
averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount: totalAndCount[0]/totalAndCount[1])


# Sort by average values
sortedMovies = averageRatings.sortBy(lambda x:x[1])

# Take the top 10 results
results = sortedMovies.take(10)

# print them out
for result in results:
  print(movieNames[result[0]], result[1])


Amityville: Dollhouse (1996) 1.0
Somebody to Love (1994) 1.0
Every Other Weekend (1990) 1.0
Homage (1995) 1.0
3 Ninjas: High Noon At Mega Mountain (1998) 1.0
Bird of Prey (1996) 1.0
Power 98 (1995) 1.0
Beyond Bedlam (1993) 1.0
Falling in Love Again (1980) 1.0
T-Men (1947) 1.0


We do not want to use Python to submit spark code, use `spark-submit`  

In [None]:
if __name__=="__main__":

  # define spark context
  conf = SparkConf().setAppName("WorstMovies")
  sc = SparkContext(conf = conf)

  # laod up movie data
  movieNames  = loadMovieNames('u.item')

  # load raw u.data file (could spread on whole cluster)
  #lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
  lines = sc.textFile("u.data")


  # Convert to (movieID, (rating,1.0))
  movieRatings = lines.map(parseInput)


  # reduce to (movieID, (sumOfRatings, totalRatings))
  ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: (movie1[0] + movie2[0], movie1[1] + movie2[1]))

  vc   = ratingTotalsAndCount.filter(lambda movie: movie[1][1] > 9 )
  # Map to (movieID, averageRating)
  averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount: totalAndCount[0]/totalAndCount[1])


  # Sort by average values
  sortedMovies = averageRatings.sortBy(lambda x:x[1])

  # Take the top 10 results
  results = sortedMovies.take(10)

  # print them out
  for result in results:
    print(movieNames[result[0]], result[1])




ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=WorstMovies, master=local[*]) created by __init__ at <ipython-input-11-b92979506560>:2 

## Use SPARK SQL and data frame

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [None]:
def loadMovieNames(data):
  movieNames = {}
  with open(data, encoding = 'latin-1') as f:
    for line in f:
      fields = line.split("|")
      movieNames[int(fields[0])] = fields[1]
  return movieNames

def parseInput(line):
  fields = line.split()
  # define a data frame
  return Row(movieID = int(fields[1]), rating = float(fields[2]))

In [None]:
if __name__ == "__main__":
  # Create a SparkSession
  spark = SparkSession.builder.appName("PopularMovie").getOrCreate()

  # Load up your movie ID -> name dictionary
  movieNames = loadMovieNames('u.item')

  # Get the raw data
  lines = spark.sparkContext.textFile("u.data")

  # Convert it to (movieID, rating)
  movies = lines.map(parseInput)


  # Convert that to a DataFrame
  movieDataset = spark.createDataFrame(movies)

  # Compute average rating for each movieID
  averageRatings = movieDataset.groupBy("movieID").avg("rating")

  # Compute count of rating for each movieID
  counts = movieDataset.groupBy("movieID").count()

  # Join the two together (We now have movieID. avg(rating), and count column)
  averageAndCounts = counts.join(averageRatings, "movieID")


  # print(averageAndCount.columns)
  data_with_names = averageAndCounts.withColumnRenamed("movieID", "col1") \
                     .withColumnRenamed("count", "col2") \
                     .withColumnRenamed("avg(rating)", "col3")

  data_with_names = data_with_names.filter(data_with_names.col2>9 )
  # PULL the top 10 results
  #topten = averageAndCounts.orderBy("avg(rating)").take(10)
  topten = data_with_names.orderBy( data_with_names.col3.asc(), data_with_names.col2.asc()).take(10)

  # Print them out, converting movieID to names

  for movie in topten:
    print(movieNames[movie[0]], movie[1], movie[2])


Children of the Corn: The Gathering (1996) 19 1.3157894736842106
Body Parts (1991) 13 1.6153846153846154
Amityville II: The Possession (1982) 14 1.6428571428571428
Bloodsport 2 (1995) 10 1.7
Lawnmower Man 2: Beyond Cyberspace (1996) 21 1.7142857142857142
Robocop 3 (1993) 11 1.7272727272727273
Free Willy 3: The Rescue (1997) 27 1.7407407407407407
Kazaam (1996) 10 1.8
Gone Fishin' (1997) 11 1.8181818181818181
Solo (1996) 12 1.8333333333333333


In [None]:
averageAndCounts = counts.join(averageRatings, "movieID")
print(averageAndCounts.columns)

['movieID', 'count', 'avg(rating)']


PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got slice.

## Use `MLlib` build a recommendation system

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit


In [None]:
# load movies
def loadMovieNames(data):
  movieNames = {}
  with open(data, encoding = 'latin-1') as f:
    for line in f:
      fields = line.split("|")
      movieNames[int(fields[0])] = fields[1] # fields[1].decode('ascii', 'ignore')
  return movieNames

def parseInput(line):
  fields = line.value.split()
  return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]) )


In [None]:
from pickle import load
if __name__ == "__main__":
  # Create  a SparkSession
  spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

  # Load up out movie ID
  movieNames = loadMovieNames('u.item')

  # Get the raw data
  lines = spark.read.text("u.data").rdd # why

  # convert it to a RDD of Row object with (userID, movieID, rating)
  ratingsRDD = lines.map(parseInput)

  # Convert to a DataFrame and cache it
  ratings = spark.createDataFrame(ratingsRDD).cache() # caling cache because we will use it more than once

  # Create an ALS collaborative filtering model from the complete data set
  als = ALS(maxIter = 5, regParam = 0.01, userCol = "userID", itemCol = "movieID", ratingCol = "rating")
  model = als.fit(ratings)


  # print out ratings from user 0
  userRatings = ratings.filter("userID = 0")
  for rating in userRatings.collect():
    print(movieNames[rating['movieID']], rating['rating'])

  print("\nTop 20 recommendations:")

  # Find movies rated more than 100 times
  ratingCounts = ratings.groupBy("movieID").count().filter("count>100")
  # Construct a "test" dataframe for user 0 with every movie rated more than 100 times
  popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))
  # second column is userID = 0, means we want to browse through every movie rate more than
  # 100 times, and predict the ratings of user 0


  recommendations = model.transform(popularMovies)
  top20 = recommendations.orderBy(recommendations.prediction.desc(), recommendations.movieID.asc()).take(20)
  # orderBy or sort

  for re in top20:
    print(movieNames[re[0]], re[2])



Star Wars (1977) 5.0
Empire Strikes Back, The (1980) 5.0
Gone with the Wind (1939) 1.0

Top 20 recommendations:


In [None]:
  top20 = recommendations.orderBy(recommendations.prediction.desc(), recommendations.movieID.asc()).take(20)


  for re in top20:
    print(movieNames[re[0]], re[2])

Ace Ventura: Pet Detective (1994) 6.27745246887207
Nightmare on Elm Street, A (1984) 5.941840648651123
Princess Bride, The (1987) 5.611507415771484
Army of Darkness (1993) 5.588329315185547
Beavis and Butt-head Do America (1996) 5.561034679412842
Austin Powers: International Man of Mystery (1997) 5.423329830169678
Clerks (1994) 5.343860149383545
Star Trek: The Wrath of Khan (1982) 5.244342803955078
Blues Brothers, The (1980) 5.182976722717285
Terminator, The (1984) 5.093161106109619
Die Hard (1988) 5.047311305999756
Seven (Se7en) (1995) 5.028848648071289
Star Wars (1977) 4.998826026916504
Happy Gilmore (1996) 4.973372936248779
Highlander (1986) 4.960333824157715
Batman (1989) 4.954259395599365
Empire Strikes Back, The (1980) 4.953100681304932
Mystery Science Theater 3000: The Movie (1996) 4.9426374435424805
Monty Python and the Holy Grail (1974) 4.932487487792969
Aliens (1986) 4.852281093597412


In [None]:
userRatings = ratings.filter("userID = 0")
for rating in userRatings.collect():
    print(movieNames[rating['movieID']], rating['rating'])

[Row(movieID=474, count=194, avg(rating)=4.252577319587629),
 Row(movieID=29, count=114, avg(rating)=2.6666666666666665),
 Row(movieID=26, count=73, avg(rating)=3.452054794520548),
 Row(movieID=964, count=9, avg(rating)=3.3333333333333335),
 Row(movieID=65, count=115, avg(rating)=3.5391304347826087),
 Row(movieID=191, count=276, avg(rating)=4.163043478260869),
 Row(movieID=1224, count=12, avg(rating)=2.6666666666666665),
 Row(movieID=558, count=70, avg(rating)=3.6714285714285713),
 Row(movieID=1010, count=44, avg(rating)=3.25),
 Row(movieID=418, count=129, avg(rating)=3.5813953488372094)]