In [1]:
from pyspark import SparkContext,SparkConf
sc = SparkContext.getOrCreate()

In [2]:
sc

In [3]:
distData = sc.parallelize([1, 2, 3, 4])
distData

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:247

In [5]:
distData.collect()

[1, 2, 3, 4]

In [6]:
#Basic statistics using spark RDD
from random import random
l = [random() for _ in range(10)]
print(l)

[0.6995628857024389, 0.3641430697300171, 0.24819987353145323, 0.912245138682499, 0.2146589097157564, 0.815968474766547, 0.3795791568538104, 0.42816534013242435, 0.10348170707991067, 0.7917594692047765]


In [7]:
rdd = sc.parallelize(l, numSlices=4) # no of partitions = 4

In [8]:
rdd.max(), rdd.sum()

(0.912245138682499, 4.957764025399634)

In [9]:
rdd.stats()

(count: 10, mean: 0.49577640253996336, stdev: 0.2710573331083842, max: 0.912245138682499, min: 0.10348170707991067)

Exercise 1:
1. Create a directory in HDFS called movielens and load movies.csv and ratings.csv from Movies25M directory.
2. Load movies.csv into in moviesRdd
3. Load ratings.csv into ratingsRdd
4. Find out 10 top movies based on the highest average rating. Consider only those movies that have got at least 100 ratings.

In [16]:
movies = sc.textFile("Movies25M/movies.csv")
movies.count()

62424

In [17]:
for r in movies.take(10):
    print(r)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action


In [18]:
moviesData = movies.filter(lambda line: not line.startswith("movieId"))
for r in moviesData.take(10):
    print(r)

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [23]:
ratings = sc.textFile("Movies25M/ratings.csv")
ratings.count()

25000096

In [24]:
for r in ratings.take(10):
    print(r)

userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
1,1175,3.5,1147868826
1,1217,3.5,1147878326
1,1237,5.0,1147868839


In [25]:
ratingsData = ratings.filter(lambda line: not line.startswith("userId"))
for r in ratingsData.take(10):
    print(r)

1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
1,1175,3.5,1147868826
1,1217,3.5,1147878326
1,1237,5.0,1147868839
1,1250,4.0,1147868414


In [28]:
movies.first()

'movieId,title,genres'

In [29]:
ratings.first()

'userId,movieId,rating,timestamp'

In [30]:
movies_by_movieid = moviesData\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[0]), tokens[1]))

movies_by_movieid.first()

(1, 'Toy Story (1995)')

In [31]:
ratings_by_movieid = ratingsData\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[1]), float(tokens[2])))

ratings_by_movieid.first()

(296, 5.0)

In [32]:
top_10 = movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\
.filter(lambda p: p[1][1] >= 100)\
.sortBy(lambda p: p[1], False)\
.take(10)

for m in top_10:
    print(m)

((171011, 'Planet Earth II (2016)'), (4.483096085409253, 1124))
((159817, 'Planet Earth (2006)'), (4.464796794504865, 1747))
((318, '"Shawshank Redemption'), (4.413576004516335, 81482))
((170705, 'Band of Brothers (2001)'), (4.398598820058997, 1356))
((171495, 'Cosmos'), (4.3267148014440435, 277))
((858, '"Godfather'), (4.324336165187245, 52498))
((179135, 'Blue Planet II (2017)'), (4.289833080424886, 659))
((50, '"Usual Suspects'), (4.284353213163313, 55366))
((198185, 'Twin Peaks (1989)'), (4.267361111111111, 288))
((1221, '"Godfather: Part II'), (4.2617585117585115, 34188))


In [33]:
movies_by_movieid.join(ratings_by_movieid)\
.first()

(23, ('Assassins (1995)', 4.0))

In [34]:
movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.first()

((23, 'Assassins (1995)'), 4.0)

In [35]:
top_10 = movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\
.filter(lambda p: p[1][1] >= 100)

top_10.first()

((391, "Jason's Lyric (1994)"), (3.1876574307304786, 397))

In [36]:
top_10 = movies_by_movieid.join(ratings_by_movieid)\
.map(lambda p: ((p[0], p[1][0]), p[1][1]))\
.groupByKey().mapValues(lambda values: (sum(values)/len(values), len(values)))\
.filter(lambda p: p[1][1] >= 100)\
.sortBy(lambda p: p[1], False)\

for m in top_10.take(10):
    print(m)

((171011, 'Planet Earth II (2016)'), (4.483096085409253, 1124))
((159817, 'Planet Earth (2006)'), (4.464796794504865, 1747))
((318, '"Shawshank Redemption'), (4.413576004516335, 81482))
((170705, 'Band of Brothers (2001)'), (4.398598820058997, 1356))
((171495, 'Cosmos'), (4.3267148014440435, 277))
((858, '"Godfather'), (4.324336165187245, 52498))
((179135, 'Blue Planet II (2017)'), (4.289833080424886, 659))
((50, '"Usual Suspects'), (4.284353213163313, 55366))
((198185, 'Twin Peaks (1989)'), (4.267361111111111, 288))
((1221, '"Godfather: Part II'), (4.2617585117585115, 34188))
