In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
# Creating RDD's

movieRDD = sc.textFile("/Users/pravinkumar/Documents/Spark/testData/Movie-Rating\ Query\ Exercises/Movie.txt")
ratingRDD = sc.textFile("/Users/pravinkumar/Documents/Spark/testData/Movie-Rating\ Query\ Exercises/Rating.txt")
reviewerRDD = sc.textFile("/Users/pravinkumar/Documents/Spark/testData/Movie-Rating\ Query\ Exercises/Reviewer.txt")

for i in movieRDD.take(2): print(i)
for i in ratingRDD.take(2): print(i)
for i in reviewerRDD.take(2): print(i)

101,Gone with the Wind,1939,Victor Fleming
102,Star Wars,1977,George Lucas
201,101,2,2011-01-22
201,101,4,2011-01-27
201,Sarah Martinez
202,Daniel Lewis


In [5]:
# Creating DF's
from pyspark.sql import Row
movieDF = movieRDD.map(lambda rec: rec.split(",")).map(lambda rec: Row(mID = rec[0], title = rec[1], year = rec[2], \
                                                                       director = rec[3])).toDF()
reviewerDF = reviewerRDD.map(lambda rec: rec.split(",")).map(lambda rec: Row(rID = rec[0], name = rec[1])).toDF()
ratingDF = ratingRDD.map(lambda rec: rec.split(",")).map(lambda rec: Row(rID = rec[0], mID = rec[1], stars = rec[2], ratingDate = rec[3])).toDF()

movieDF.registerTempTable("movie")
reviewerDF.registerTempTable("reviewer")
ratingDF.registerTempTable("rating")

movieDF.limit(2).show()
ratingDF.limit(2).show()
reviewerDF.limit(2).show()

+--------------+---+------------------+----+
|      director|mID|             title|year|
+--------------+---+------------------+----+
|Victor Fleming|101|Gone with the Wind|1939|
|  George Lucas|102|         Star Wars|1977|
+--------------+---+------------------+----+

+---+---+----------+-----+
|mID|rID|ratingDate|stars|
+---+---+----------+-----+
|101|201|2011-01-22|    2|
|101|201|2011-01-27|    4|
+---+---+----------+-----+

+--------------+---+
|          name|rID|
+--------------+---+
|Sarah Martinez|201|
|  Daniel Lewis|202|
+--------------+---+



In [6]:
# Query 01 - Find the titles of all movies directed by Steven Spielberg.

Query01RDD = movieRDD.map(lambda rec: rec.split(",")).filter(lambda rec: rec[3] == "Steven Spielberg").\
map(lambda rec: rec[1])
for i in Query01RDD.collect(): print(i)
    
sqlContext.sql("select m.title from movie m where m.director = 'Steven Spielberg'").show()

E.T.
Raiders of the Lost Ark
+--------------------+
|               title|
+--------------------+
|                E.T.|
|Raiders of the Lo...|
+--------------------+



In [7]:
# Query 02 - Find all years that have a movie that received a rating of 4 or 5, and sort them in increasing order.


# Trimming down the rating RDD according to the Query
ratingTrim = ratingRDD.map(lambda rec: rec.split(",")).filter(lambda rec: rec[2] == '4' or rec[2] == '5').\
map(lambda rec: (rec[1], rec[2]))
movieTrim = movieRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[2]))

Query02RDD = ratingTrim.join(movieTrim).map(lambda rec: rec[1][1]).distinct(numPartitions = 1)
for i in Query02RDD.collect(): print(i)

sqlContext.sql("select distinct m.year from movie m, rating r where m.mID = r.mID and r.stars in ('4', '5')").show()

1939
1937
1981
2009
+----+
|year|
+----+
|1981|
|2009|
|1939|
|1937|
+----+



In [8]:
# Query 03 - Find the titles of all movies that have no ratings. 

movieTrim = movieRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[1]))
ratingTrim = ratingRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[1], rec[2]))
Query03RDD = movieTrim.subtractByKey(ratingTrim).map(lambda rec: rec[1])

for i in Query03RDD.collect(): print(i)
    
sqlContext.sql("select distinct m.title from movie m where m.mID not in (select r.mID from rating r)").show()

Star Wars
Titanic
+---------+
|    title|
+---------+
|Star Wars|
|  Titanic|
+---------+



In [9]:
# Query 04 - Some reviewers didn't provide a date with their rating. 
# Find the names of all reviewers who have ratings with a NULL value for the date. 

ratingTrim = ratingRDD.map(lambda rec: rec.split(",")).filter(lambda rec: rec[3] == 'null').\
map(lambda rec: (rec[0], rec[3]))
#for i in ratingTrim.collect(): print(i)
reviewerTrim = reviewerRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[1]))
#for i in reviewerTrim.collect(): print(i)

Query04RDD = ratingTrim.join(reviewerTrim).map(lambda rec: rec[1][1])
for i in Query04RDD.collect(): print(i)
    
sqlContext.sql("select e.name from reviewer e where e.rID in (select r.rID from rating r where r.ratingDate == 'null')").\
show()

Daniel Lewis
Chris Jackson
+-------------+
|         name|
+-------------+
|Chris Jackson|
| Daniel Lewis|
+-------------+



In [14]:
# Query 05 - Write a query to return the ratings data in a more readable format: 
# reviewer name, movie title, stars, and ratingDate. Also, sort the data, 
# first by reviewer name, then by movie title, and lastly by number of stars. 

movieTrim  = movieRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[1]))
reviewerTrim = reviewerRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[1]))
ratingTrim = ratingRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0],(rec[1], rec[2], rec[3])))
ratingJoinReviewer = reviewerTrim.join(ratingTrim).map(lambda rec: (rec[1][1][0], (rec[1][0], rec[1][1][1], rec[1][1][2])))
movieJoinrating = movieTrim.join(ratingJoinReviewer).map(lambda rec: ((rec[1][1][0], rec[1][0], rec[1][1][1]), rec[1][1][2]))

Query05RDD = movieJoinrating.sortBy(lambda rec: rec[0], ascending = True).\
sortByKey(ascending = True).map(lambda rec: [rec[0][0], rec[0][1], rec[0][2], rec[1]])
for i in Query05RDD.collect(): print(i)

    
sqlContext.sql("select e.name, m.title, r.stars, r.ratingDate from movie m, reviewer e, rating r where \
e.rID = r.rID and r.mID = m.mID order by e.name, m.title, r.stars asc").show()

['Ashley White', 'E.T.', '3', '2011-01-02']
['Brittany Harris', 'Raiders of the Lost Ark', '2', '2011-01-30']
['Brittany Harris', 'Raiders of the Lost Ark', '4', '2011-01-12']
['Brittany Harris', 'The Sound of Music', '2', '2011-01-20']
['Chris Jackson', 'E.T.', '2', '2011-01-22']
['Chris Jackson', 'Raiders of the Lost Ark', '4', 'null']
['Chris Jackson', 'The Sound of Music', '3', '2011-01-27']
['Daniel Lewis', 'Snow White', '4', 'null']
['Elizabeth Thomas', 'Avatar', '3', '2011-01-15']
['Elizabeth Thomas', 'Snow White', '5', '2011-01-19']
['James Cameron', 'Avatar', '5', '2011-01-20']
['Mike Anderson', 'Gone with the Wind', '3', '2011-01-09']
['Sarah Martinez', 'Gone with the Wind', '2', '2011-01-22']
['Sarah Martinez', 'Gone with the Wind', '4', '2011-01-27']
+----------------+--------------------+-----+----------+
|            name|               title|stars|ratingDate|
+----------------+--------------------+-----+----------+
|    Ashley White|                E.T.|    3|2011-01-02|

In [6]:
# Exercise @ https://lagunita.stanford.edu/courses/DB/SQL/SelfPaced/courseware/ch-sql/seq-exercise-sql_movie_query_core/

# Here's the schema: 

# Movie ( mID, title, year, director ) 
# English: There is a movie with ID number mID, a title, a release year, and a director. 

# Reviewer ( rID, name ) 
# English: The reviewer with ID number rID has a certain name. 

# Rating ( rID, mID, stars, ratingDate ) 
# English: The reviewer rID gave the movie mID a number of stars rating (1-5) on a certain ratingDate. 

In [24]:
# Query 06 - For all cases where the same reviewer rated the same movie twice and gave it a higher 
# rating the second time, return the reviewer's name and the title of the movie.

# Broadcasting datas
movieTrim = movieRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[1]))
movieBC = sc.broadcast(movieTrim.collectAsMap())
ratingTrim = ratingRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], (rec[1], rec[2]))).groupByKey()
ratingBC = sc.broadcast(ratingTrim.collectAsMap())
reviewerTrim = reviewerRDD.map(lambda rec: rec.split(",")).map(lambda rec: (rec[0], rec[1]))
reviewerBC = sc.broadcast(reviewerTrim.collectAsMap())

# Quering Data
def checkMovie(rec):
    movieNStar = ratingBC.value.get(rec, None)
    movieMap = map()
    movieMap['0'] = '0'
    for i in movieNStar:
        if movieNStar == None:
            break
        movie, star = movieNStar
        if int(movieMap[movie]) < int(star):
            return [reviewerBC.value.get(rec, None), movieBC.value.get(movie, None)]
        movieMap[movie] = star
        
        


Query06RDD = ratingTrim.map(lambda rec: checkMovie(rec[0]))
for i in Query06RDD.collect(): print(i)


#sqlContext.sql("select e.name, m.title from rating r, reviewer e, movie m where ")

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 210.0 failed 1 times, most recent failure: Lost task 0.0 in stage 210.0 (TID 2786, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/pravinkumar/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/Users/pravinkumar/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/pravinkumar/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-24-43270ac6cca4>", line 28, in <lambda>
  File "<ipython-input-24-43270ac6cca4>", line 15, in checkMovie
TypeError: map() must have at least two arguments.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor105.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/pravinkumar/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/Users/pravinkumar/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/pravinkumar/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-24-43270ac6cca4>", line 28, in <lambda>
  File "<ipython-input-24-43270ac6cca4>", line 15, in checkMovie
TypeError: map() must have at least two arguments.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
# Query 07 - For each movie that has at least one rating, find the highest number of stars that movie received. 
# Return the movie title and number of stars. Sort by movie title.

In [None]:
# Query 08 - For each movie, return the title and the 'rating spread', that is, 
# the difference between highest and lowest ratings given to that movie. 
# Sort by rating spread from highest to lowest, then by movie title. 

In [None]:
# Query 09 - Find the difference between the average rating of movies released before 1980 
# and the average rating of movies released after 1980. (Make sure to calculate the average rating 
# for each movie, then the average of those averages for movies before 1980 and movies after. 
# Don't just calculate the overall average rating before and after 1980.) 

In [None]:
# Query 10 - 

In [None]:
# Query 11 - 