In [2]:
%%time
#import movie ratings into RDD
ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")
#import user details into RDD
userLines = sc.textFile("///SparkCourse/ml-100k/u.user")
#import movie data into RDD
movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")
#import genre data into RDD
genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")

Wall time: 145 ms


In [3]:
%%time
#split on delimiter functions
def splitRatingTab(line):
    line = line.split('\t')
    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)
def splitUserPipe(line):
    line = line.split('|')
    return (int(line[0]), line[3]) #(user, occupation)
def splitMoviePipe(line):
    line = line.split('|')
    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])
def splitGenrePipe(line):
    line = line.split('|')    
    return (int(line[1]), line[0]) #(genreId, genre)

def listToIntElements(lst):
    """conver the boolean text ('0', '1') genre value to integers (0, 1)"""
    for cnt, _ in enumerate(lst):
        lst[cnt] = int(_)
    return lst

# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers
ratingRDD = ratingLines.map(lambda line: splitRatingTab(line))
print ('ratingRDD:\n',ratingRDD.top(5))

# Transform to RDD as [(user, occupation)]
occupationRDD = userLines.map(splitUserPipe)
print ('occupationRDD:\n',occupationRDD.top(3))

# Transform to RDD as [(user, age)]
ageRDD = userLines.map(lambda line: (int(line.split('|')[0]), line.split('|')[1]))
print ('ageRDD:\n',ageRDD.top(3))

# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres
movieRDD = movieLines.map(splitMoviePipe)
print ('movieRDD:\n',movieRDD.top(3))

# Transform to RDD as [(genreId, genre)]
genreRDD = genreLines.map(lambda line: splitGenrePipe(line))
print ('genreRDD:\n',genreRDD.take(3))

ratingRDD:
 [(943, 1330, 3), (943, 1228, 3), (943, 1188, 3), (943, 1074, 4), (943, 1067, 2)]
occupationRDD:
 [(943, 'student'), (942, 'librarian'), (941, 'student')]
ageRDD:
 [(943, '22'), (942, '48'), (941, '20')]
movieRDD:
 [(1682, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1681, [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1680, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])]
genreRDD:
 [(0, 'unknown'), (1, 'Action'), (2, 'Adventure')]
Wall time: 16.6 s


In [101]:
# Create a genre dictionary with {genreID: [genre, genreID]}
with open('C:/SparkCourse/ml-100k/u.genre') as file:
    genre = {}
    for line in file:
        #each line is of type [genere, genreid]
        line = line.split('|')
        #convert genreid to int, to remove new line '\n' at the end of string
        genre[int(line[1])] = [line[0], int(line[1])]
    print (genre)

{0: ['unknown', 0], 1: ['Action', 1], 2: ['Adventure', 2], 3: ['Animation', 3], 4: ["Children's", 4], 5: ['Comedy', 5], 6: ['Crime', 6], 7: ['Documentary', 7], 8: ['Drama', 8], 9: ['Fantasy', 9], 10: ['Film-Noir', 10], 11: ['Horror', 11], 12: ['Musical', 12], 13: ['Mystery', 13], 14: ['Romance', 14], 15: ['Sci-Fi', 15], 16: ['Thriller', 16], 17: ['War', 17], 18: ['Western', 18]}


In [6]:
%%time
#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; 
#then Transform to [(movieid,((userid, rating), genre) )]
joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)
print (joinRatingMovieGenres.take(2))

[(4, ((264, 3), [0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), (4, ((303, 5), [0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))]
Wall time: 8.77 s


In [7]:
%%time
#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]
#to Transform to [(occupation, ((1_ratingCount, genre)))]
transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))
joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1]))).cache()
print (joinRatingGenresOccup.take(2))

[('librarian', (1, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))]
Wall time: 11.4 s


In [11]:
%%time
#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, [cntGenresRating]))]
totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), [(x[1][0]+y[1][0]), (x[1][1]+y[1][1])]))
print (totalRatingGenreCntByOccupation.take(2))

[('executive', (4382, [12, 920])), ('retired', (1315, [1, 291]))]
Wall time: 7.76 s


In [12]:
#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, [cntGenresRating]))]
totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), [(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11]), (x[1][12]+y[1][12]), (x[1][13]+y[1][13]), (x[1][14]+y[1][14]), (x[1][15]+y[1][15]), (x[1][16]+y[1][16]), (x[1][17]+y[1][17]), (x[1][18]+y[1][18])]))
print (totalRatingGenreCntByOccupation.take(2))

[('executive', (4382, [12, 920, 496, 122, 404, 1284, 319, 86, 1657, 70, 87, 349, 141, 261, 769, 438, 755, 199, 78])), ('retired', (1315, [1, 291, 155, 29, 107, 413, 90, 20, 500, 21, 32, 110, 38, 83, 226, 134, 233, 63, 26]))]


In [98]:
#using the genre dictionary defined outside to assign the genre values to every count
def calcPercent(line):
    '''calculate the percentage of each rating for the input (totalRatings, [%GenreRatings])
    update the genre dictionary from {genreId:('unknown', genreId)} to {genreId:('unknown', [prcntRating)}'''
    tot = line[0]
    lst = []
    for i, val in enumerate(line[1]):
        calc = round(float((val*100)/tot))
        lst.append(calc)
        genre[i][1] = calc
    return (tot, list(genre.values()))
    

In [100]:
#Transform totalRatingGenreCntByOccupation to [occupation, (totalRatings, [genre, %GenreRatings])]
prcntGenreRatingsByOccupation = totalRatingGenreCntByOccupation.map(lambda line: (line[0], calcPercent(line[1])))
print(prcntGenreRatingsByOccupation.take(2))

[('executive', (4382, [['unknown', 0], ['Action', 21], ['Adventure', 11], ['Animation', 3], ["Children's", 9], ['Comedy', 29], ['Crime', 7], ['Documentary', 2], ['Drama', 38], ['Fantasy', 2], ['Film-Noir', 2], ['Horror', 8], ['Musical', 3], ['Mystery', 6], ['Romance', 18], ['Sci-Fi', 10], ['Thriller', 17], ['War', 5], ['Western', 2]])), ('retired', (1315, [['unknown', 0], ['Action', 22], ['Adventure', 12], ['Animation', 2], ["Children's", 8], ['Comedy', 31], ['Crime', 7], ['Documentary', 2], ['Drama', 38], ['Fantasy', 2], ['Film-Noir', 2], ['Horror', 8], ['Musical', 3], ['Mystery', 6], ['Romance', 17], ['Sci-Fi', 10], ['Thriller', 18], ['War', 5], ['Western', 2]]))]


In [137]:
#Collect the RDD data into python object [(occupation, (tot, [[genre, genrePercent], [], ...]))] and print:
resultList = prcntGenreRatingsByOccupation.collect()

#print the header
print ('Occupation'.rjust(13),':', genre[0][0][:3],'|', genre[1][0][:3],'|', genre[2][0][:3],'|', genre[3][0][:3],'|', genre[4][0][:3],'|', genre[5][0][:3],'|', genre[6][0][:3],'|', genre[7][0][:3],'|', genre[8][0][:3],'|', genre[9][0][:3],'|', genre[10][0][:3],'|', genre[11][0][:3],'|', genre[12][0][:3],'|', genre[13][0][:3],'|', genre[14][0][:3],'|', genre[15][0][:3],'|', genre[16][0][:3],'|', genre[17][0][:3],'|', genre[18][0][:3])
print ('-'*127)

#print occupation: genreRatingPercentages in 2 digits
for i in resultList:
    print (i[0].rjust(13),':', str(i[1][1][0][1]).rjust(3),'|', str(i[1][1][1][1]).rjust(3),'|', str(i[1][1][2][1]).rjust(3),'|', str(i[1][1][3][1]).rjust(3),'|', str(i[1][1][4][1]).rjust(3),'|', str(i[1][1][5][1]).rjust(3),'|', str(i[1][1][6][1]).rjust(3),'|', str(i[1][1][7][1]).rjust(3),'|', str(i[1][1][8][1]).rjust(3),'|', str(i[1][1][9][1]).rjust(3),'|', str(i[1][1][10][1]).rjust(3),'|', str(i[1][1][11][1]).rjust(3),'|', str(i[1][1][12][1]).rjust(3),'|', str(i[1][1][13][1]).rjust(3),'|', str(i[1][1][14][1]).rjust(3),'|', str(i[1][1][15][1]).rjust(3),'|', str(i[1][1][16][1]).rjust(3),'|', str(i[1][1][17][1]).rjust(3),'|', str(i[1][1][18][1]).rjust(3))
    

   Occupation : unk | Act | Adv | Ani | Chi | Com | Cri | Doc | Dra | Fan | Fil | Hor | Mus | Mys | Rom | Sci | Thr | War | Wes
-------------------------------------------------------------------------------------------------------------------------------
    executive :   0 |  21 |  11 |   3 |   9 |  29 |   7 |   2 |  38 |   2 |   2 |   8 |   3 |   6 |  18 |  10 |  17 |   5 |   2
      retired :   0 |  20 |  12 |   3 |   9 |  30 |   7 |   2 |  40 |   2 |   2 |   8 |   3 |   6 |  17 |  10 |  16 |   5 |   2
     educator :   0 |  20 |  12 |   3 |   9 |  30 |   7 |   2 |  40 |   2 |   2 |   8 |   3 |   6 |  17 |  10 |  16 |   5 |   2
administrator :   0 |  21 |  12 |   3 |   9 |  29 |   7 |   2 |  39 |   2 |   2 |   8 |   3 |   6 |  18 |  10 |  16 |   5 |   2
    homemaker :   0 |  21 |  12 |   3 |   9 |  29 |   7 |   2 |  39 |   2 |   2 |   8 |   3 |   6 |  18 |  10 |  16 |   5 |   2
   programmer :   0 |  21 |  12 |   3 |   9 |  29 |   7 |   2 |  39 |   2 |   2 |   8 |   3 |   6 |  18 

### Testing the reduceByKey() Transformation

In [139]:
rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))])

In [140]:
result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11])))
print (result.top(3))

[('librarian', (2, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0))]


In [19]:
# Works because only 2 elements
rdd1 = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0]))])
result = rdd1.reduceByKey(lambda x, y: ((x[0]+y[0]),(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2])))
print (result.top(3))

[('librarian', (2, 0, 0, 2))]


In [16]:
# Fails : When you reduceByKey you have to return the same structure you have received, otherwise the next time you will meet 
# a value of the same key and will try to reduce it your function will not work
rdd2 = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])), ('librarian', (1, [0, 1, 0, 0]))])
result = rdd2.reduceByKey(lambda x, y: ((x[0]+y[0]),(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]) ))
print (result.top(3))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times, most recent failure: Lost task 1.0 in stage 39.0 (TID 68, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\python\pyspark\rdd.py", line 362, in func
    return f(iterator)
  File "C:\spark\python\pyspark\rdd.py", line 1865, in _mergeCombiners
    merger.mergeCombiners(iterator)
  File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 272, in mergeCombiners
    d[k] = comb(d[k], v) if k in d else v
  File "<ipython-input-16-56c286a9920e>", line 3, in <lambda>
TypeError: 'int' object is not subscriptable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark\python\pyspark\rdd.py", line 362, in func
    return f(iterator)
  File "C:\spark\python\pyspark\rdd.py", line 1865, in _mergeCombiners
    merger.mergeCombiners(iterator)
  File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 272, in mergeCombiners
    d[k] = comb(d[k], v) if k in d else v
  File "<ipython-input-16-56c286a9920e>", line 3, in <lambda>
TypeError: 'int' object is not subscriptable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [20]:
# Success : As reduceByKey returns the same structure you have received
rdd3 = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])), ('librarian', (1, [0, 1, 0, 0]))])
result = rdd3.reduceByKey(lambda x, y: ((x[0]+y[0]),[(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3])]))
print (result.top(3))

[('librarian', (3, [0, 2, 1, 0]))]


### Python Spark program for Analysis of Movie Ratings percentages across Occupation and Movie Genre 

In [3]:
%%time

##############################################################################
### Analysis of Movie Ratings percentages across Occupation and Movie Genre
##############################################################################

####from pyspark import SparkConf, SparkContext

####conf = SparkConf().SetMaster("local").setAppName("MovieCountByRatings")
####sc = SparkContext(conf = conf)

#import movie ratings into RDD
ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")
#import user details into RDD
userLines = sc.textFile("///SparkCourse/ml-100k/u.user")
#import movie data into RDD
movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")
#import genre data into RDD
genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")

#split on delimiter functions
def splitRatingTab(line):
    line = line.split('\t')
    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)
def splitUserPipe(line):
    line = line.split('|')
    return (int(line[0]), line[3]) #(user, occupation)
def splitMoviePipe(line):
    line = line.split('|')
    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])

def listToIntElements(lst):
    """conver the boolean text ('0', '1') genre value to integers (0, 1)"""
    for cnt, _ in enumerate(lst):
        lst[cnt] = int(_)
    return lst



# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers
ratingRDD = ratingLines.map(lambda line: splitRatingTab(line))
#print ('ratingRDD:\n',ratingRDD.top(5))

# Transform to RDD as [(user, occupation)]
occupationRDD = userLines.map(splitUserPipe)
#print ('occupationRDD:\n',occupationRDD.top(3))

# Transform to RDD as [(user, age)]
ageRDD = userLines.map(lambda line: (int(line.split('|')[0]), line.split('|')[1]))
#print ('ageRDD:\n',ageRDD.top(3))

# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres
movieRDD = movieLines.map(splitMoviePipe)
#print ('movieRDD:\n',movieRDD.top(3))


# Create a genre dictionary with {genreID: [genre, genreID]}
with open('C:/SparkCourse/ml-100k/u.genre') as file:
    genre = {}
    for line in file:
        #each line is of type [genere, genreid]
        line = line.split('|')
        #convert genreid to int, to remove new line '\n' at the end of string
        genre[int(line[1])] = [line[0], int(line[1])]

        
#using the genre dictionary defined outside to assign the genre values to every count
def calcPercent(line):
    '''calculate the percentage of each rating for the input (totalRatings, [%GenreRatings])
    update the genre dictionary from {genreId:('unknown', genreId)} to {genreId:('unknown', [prcntRating)}'''
    tot = line[0]
    lst = []
    for i, val in enumerate(line[1]):
        calc = round(float((val*100)/tot))
        lst.append(calc)
        genre[i][1] = calc
    return (tot, list(genre.values()))


#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; 
#then Transform to [(movieid,((userid, rating), genre) )]
joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)

#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]
#to Transform to [(occupation, ((1_ratingCount, genre)))]
transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))
joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1]))).cache()

#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, [cntGenresRating]))]
totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), [(x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11]), (x[1][12]+y[1][12]), (x[1][13]+y[1][13]), (x[1][14]+y[1][14]), (x[1][15]+y[1][15]), (x[1][16]+y[1][16]), (x[1][17]+y[1][17]), (x[1][18]+y[1][18])]))

#Transform totalRatingGenreCntByOccupation to [occupation, (totalRatings, [genre, %GenreRatings])]
prcntGenreRatingsByOccupation = totalRatingGenreCntByOccupation.map(lambda line: (line[0], calcPercent(line[1])))


#Collect the RDD data into python object [(occupation, (tot, [[genre, genrePercent], [], ...]))] and print:
resultList = prcntGenreRatingsByOccupation.collect()

#print the header
print ('Occupation'.rjust(13),':', genre[0][0][:3],'|', genre[1][0][:3],'|', genre[2][0][:3],'|', genre[3][0][:3],'|', genre[4][0][:3],'|', genre[5][0][:3],'|', genre[6][0][:3],'|', genre[7][0][:3],'|', genre[8][0][:3],'|', genre[9][0][:3],'|', genre[10][0][:3],'|', genre[11][0][:3],'|', genre[12][0][:3],'|', genre[13][0][:3],'|', genre[14][0][:3],'|', genre[15][0][:3],'|', genre[16][0][:3],'|', genre[17][0][:3],'|', genre[18][0][:3])
print ('-'*127)

#print occupation: genreRatingPercentages in 2 digits
for i in resultList:
    print (i[0].rjust(13),':', str(i[1][1][0][1]).rjust(3),'|', str(i[1][1][1][1]).rjust(3),'|', str(i[1][1][2][1]).rjust(3),'|', str(i[1][1][3][1]).rjust(3),'|', str(i[1][1][4][1]).rjust(3),'|', str(i[1][1][5][1]).rjust(3),'|', str(i[1][1][6][1]).rjust(3),'|', str(i[1][1][7][1]).rjust(3),'|', str(i[1][1][8][1]).rjust(3),'|', str(i[1][1][9][1]).rjust(3),'|', str(i[1][1][10][1]).rjust(3),'|', str(i[1][1][11][1]).rjust(3),'|', str(i[1][1][12][1]).rjust(3),'|', str(i[1][1][13][1]).rjust(3),'|', str(i[1][1][14][1]).rjust(3),'|', str(i[1][1][15][1]).rjust(3),'|', str(i[1][1][16][1]).rjust(3),'|', str(i[1][1][17][1]).rjust(3),'|', str(i[1][1][18][1]).rjust(3))
    

   Occupation : unk | Act | Adv | Ani | Chi | Com | Cri | Doc | Dra | Fan | Fil | Hor | Mus | Mys | Rom | Sci | Thr | War | Wes
-------------------------------------------------------------------------------------------------------------------------------
    executive :   0 |  21 |  11 |   3 |   9 |  29 |   7 |   2 |  38 |   2 |   2 |   8 |   3 |   6 |  18 |  10 |  17 |   5 |   2
      retired :   0 |  20 |  12 |   3 |   9 |  30 |   7 |   2 |  40 |   2 |   2 |   8 |   3 |   6 |  17 |  10 |  16 |   5 |   2
     educator :   0 |  20 |  12 |   3 |   9 |  30 |   7 |   2 |  40 |   2 |   2 |   8 |   3 |   6 |  17 |  10 |  16 |   5 |   2
administrator :   0 |  21 |  12 |   3 |   9 |  29 |   7 |   2 |  39 |   2 |   2 |   8 |   3 |   6 |  18 |  10 |  16 |   5 |   2
    homemaker :   0 |  21 |  12 |   3 |   9 |  29 |   7 |   2 |  39 |   2 |   2 |   8 |   3 |   6 |  18 |  10 |  16 |   5 |   2
   programmer :   0 |  21 |  12 |   3 |   9 |  29 |   7 |   2 |  39 |   2 |   2 |   8 |   3 |   6 |  18 