In [1]:
import sys
import os

sys.path.insert(0, '/spark/python')
sys.path.insert(0, '/spark/python/lib/py4j-0.10.7-src.zip')

os.environ['SPARK_HOME'] = '/spark'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("spark://spark-master:7077")
conf.set("spark.driver.memory","1g")
conf.set("spark.executor.memory","1g")
conf.set("spark.num.executors","2")
conf.set("spark.executor.cores","1")

sc = pyspark.SparkContext(conf=conf)

In [2]:
sc

In [3]:
!ls -lh /opt/spark-data/ml-latest/

total 1.2G
-rw-r--r-- 1 root root 9.6K Sep 26  2018 README.txt
-rw-r--r-- 1 root root 396M Sep 26  2018 genome-scores.csv
-rw-r--r-- 1 root root  18K Sep 26  2018 genome-tags.csv
-rw-r--r-- 1 root root 1.3M Sep 26  2018 links.csv
-rw-r--r-- 1 root root 2.8M Sep 26  2018 movies.csv
-rw-r--r-- 1 root root 725M Sep 26  2018 ratings.csv
-rw-r--r-- 1 root root  38M Sep 26  2018 tags.csv


In [4]:
!head -n 10 /opt/spark-data/ml-latest/movies.csv













In [5]:
!head -n 10 /opt/spark-data/ml-latest/ratings.csv













In [6]:
!head -n 10 /opt/spark-data/ml-latest/tags.csv













In [7]:
ratings = sc.textFile("/opt/spark-data/ml-latest/ratings.csv")

In [8]:
ratings.cache()

/opt/spark-data/ml-latest/ratings.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
%%time
ratings.count()

CPU times: user 12.3 ms, sys: 2.24 ms, total: 14.6 ms
Wall time: 29.8 s


27753445

In [10]:
%%time
ratings.count()

CPU times: user 11.5 ms, sys: 0 ns, total: 11.5 ms
Wall time: 22.6 s


27753445

In [11]:
ratings.take(5)

['userId,movieId,rating,timestamp',
 '1,307,3.5,1256677221',
 '1,481,3.5,1256677456',
 '1,1091,1.5,1256677471',
 '1,1257,4.5,1256677460']

In [12]:
ratingHeader = ratings.first() #extract header
print(ratingHeader)

userId,movieId,rating,timestamp


In [13]:
ratingsOnly = ratings.filter(lambda x: x != ratingHeader)

In [14]:
ratingsOnly.take(5)

['1,307,3.5,1256677221',
 '1,481,3.5,1256677456',
 '1,1091,1.5,1256677471',
 '1,1257,4.5,1256677460',
 '1,1449,4.5,1256677264']

In [15]:
movieRatings = ratingsOnly.map(lambda line: (line.split(",")[1], float(line.split(",")[2])))

In [16]:
movieRatings.take(5)

[('307', 3.5), ('481', 3.5), ('1091', 1.5), ('1257', 4.5), ('1449', 4.5)]

In [17]:
groupByKeyRatings = movieRatings.groupByKey()

groupByKeyRatings.take(5)

[('25827', <pyspark.resultiterable.ResultIterable at 0x7fcee70d5780>),
 ('176669', <pyspark.resultiterable.ResultIterable at 0x7fcee70d56a0>),
 ('167392', <pyspark.resultiterable.ResultIterable at 0x7fcee70d5e80>),
 ('65614', <pyspark.resultiterable.ResultIterable at 0x7fcee70d52b0>),
 ('122758', <pyspark.resultiterable.ResultIterable at 0x7fcee70d5a90>)]

In [18]:
mapValuesToListRatings = groupByKeyRatings.mapValues(list)
mapValuesToListRatings.take(5)

[('25827',
  [3.0,
   4.0,
   5.0,
   3.0,
   4.0,
   4.0,
   4.0,
   3.5,
   2.5,
   4.5,
   4.0,
   3.5,
   5.0,
   4.5,
   1.0,
   4.5,
   4.0,
   4.0,
   1.5,
   3.5,
   4.5,
   2.0,
   4.0,
   4.0,
   3.5,
   3.0,
   1.5,
   4.0,
   4.0,
   3.5,
   3.5,
   4.0,
   4.5,
   3.0,
   3.5,
   5.0,
   3.0,
   3.5,
   4.5,
   0.5,
   4.5,
   4.5,
   3.0,
   4.0,
   4.0,
   4.0,
   4.5,
   3.0,
   4.0,
   5.0,
   3.0,
   3.5,
   4.5,
   3.5,
   3.0,
   3.5,
   3.0,
   4.0,
   3.5,
   4.0,
   2.5,
   4.0,
   3.0,
   3.0,
   3.0,
   2.0,
   4.5,
   3.0,
   3.5,
   5.0,
   4.0,
   3.0,
   4.0,
   3.5,
   3.5,
   4.0,
   4.5,
   5.0,
   4.0,
   4.5,
   3.5,
   4.0,
   4.0,
   4.5,
   3.5,
   4.0,
   4.5,
   4.0,
   4.5,
   3.5,
   3.0,
   4.5,
   2.5,
   4.0,
   3.0,
   4.0,
   3.0,
   4.0,
   4.0,
   4.0,
   3.0,
   3.5,
   4.5,
   4.0,
   4.5,
   4.0,
   2.5,
   4.0,
   4.0,
   4.0,
   4.5,
   4.0,
   2.0,
   1.0,
   3.0,
   3.5,
   3.0,
   5.0,
   4.5,
   3.5,
   4.0,
   4.0,
   4.0,
   3.

In [19]:
avgRatings01 = mapValuesToListRatings.mapValues(lambda V: sum(V) / float(len(V)))

avgRatings01.take(5)

[('25827', 3.746236559139785),
 ('176669', 2.5),
 ('167392', 3.946327683615819),
 ('65614', 2.75),
 ('126174', 2.25)]

In [20]:
test = [4.5, 3.5, 3.5, 4.0, 3.0, 3.5]
sum(test) / len(test)

3.6666666666666665

In [22]:
countsByKey = movieRatings.countByKey()
countsByKey

defaultdict(int,
            {'5453': 244,
             '7763': 124,
             '169160': 1,
             '104803': 13,
             '104805': 3,
             '27756': 17,
             '91421': 7,
             '26175': 16,
             '105601': 4,
             '176389': 21,
             '8292': 81,
             '173923': 1,
             '86463': 6,
             '157448': 2,
             '183323': 30,
             '74866': 5,
             '1050': 1808,
             '93618': 1,
             '3999': 4056,
             '580': 602,
             '2898': 756,
             '193197': 2,
             '160860': 15,
             '2307': 215,
             '7009': 696,
             '173625': 3,
             '163783': 4,
             '174543': 17,
             '105150': 1,
             '163633': 1,
             '4041': 4257,
             '3950': 1454,
             '3661': 375,
             '188239': 2,
             '169690': 3,
             '5040': 2120,
             '131239': 1,
             '110

In [23]:
def sumValues(x,y):
    return (x + y)

sumRatings = movieRatings.reduceByKey(sumValues)
sumRatings.take(5)

[('25827', 1742.0),
 ('176669', 2.5),
 ('167392', 698.5),
 ('65614', 22.0),
 ('122758', 1.0)]

In [24]:
import operator

sumRatings = movieRatings.reduceByKey(operator.add)
sumRatings.take(5)

[('25827', 1742.0),
 ('176669', 2.5),
 ('167392', 698.5),
 ('65614', 22.0),
 ('126174', 4.5)]

In [25]:
avgRatings02 = sumRatings.map(lambda x: (x[0], x[1] / countsByKey.get(x[0])))
avgRatings02.take(5)

[('25827', 3.746236559139785),
 ('176669', 2.5),
 ('167392', 3.946327683615819),
 ('65614', 2.75),
 ('126174', 2.25)]

In [31]:
movies = sc.textFile("/opt/spark-data/ml-latest/movies.csv")
print(type(movies))

<class 'pyspark.rdd.RDD'>


In [30]:
movieHeader = movies.first() #extract header
print(type(movieHeader))
print(movieHeader)

<class 'str'>
movieId,title,genres


In [28]:
movies = movies.filter(lambda x: x != movieHeader)
movies.take(5)

['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy',
 '3,Grumpier Old Men (1995),Comedy|Romance',
 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
 '5,Father of the Bride Part II (1995),Comedy']

In [33]:
movieInfo = movies.map(lambda line: (line.split(",")[0], ((line.rsplit(",",1)[0]).split(",",1)[1], line.rsplit(",",1)[1])))
movieInfo.take(5)

[('movieId', ('title', 'genres')),
 ('1', ('Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')),
 ('2', ('Jumanji (1995)', 'Adventure|Children|Fantasy')),
 ('3', ('Grumpier Old Men (1995)', 'Comedy|Romance')),
 ('4', ('Waiting to Exhale (1995)', 'Comedy|Drama|Romance'))]

In [34]:
augmentedRatings = avgRatings01.join(movieInfo)
augmentedRatings.take(5)

[('154660', (2.75, ('It Happened in Europe (1948)', 'Drama'))),
 ('87999',
  (2.7916666666666665,
   ('I Am Curious (Blue) (Jag är nyfiken - en film i blått) (1968)',
    'Drama'))),
 ('79794', (2.58, ('Crossworlds (1997)', 'Action|Comedy|Sci-Fi'))),
 ('158769',
  (2.7, ('They Call It Myanmar: Lifting the Curtain (2012)', 'Documentary'))),
 ('117853', (4.0, ('Bill Bailey: Qualmpeddler (2013)', 'Comedy')))]

In [35]:
augmentedRatings.takeOrdered(10, key = lambda x : -x[1][0])

[('129223', (5.0, ("Tarzan's Savage Fury (1952)", 'Action|Adventure'))),
 ('171849', (5.0, ('Without Family (1984)', 'Children|Drama'))),
 ('140363',
  (5.0,
   ('"The Good, the Bad, and Huckleberry Hound (1988)"',
    '(no genres listed)'))),
 ('135222', (5.0, ('Blue Summer (1973)', 'Comedy|Drama'))),
 ('129828',
  (5.0, ('Lotte and the Moonstone Secret (2011)', 'Animation|Children'))),
 ('156309', (5.0, ("It Ain't Hay (1943)", 'Comedy'))),
 ('142809', (5.0, ('Hush Little Baby (2007)', 'Drama|Mystery|Thriller'))),
 ('93967',
  (5.0,
   ('"Keeping the Promise (Sign of the Beaver, The) (1997)"',
    'Children|Drama'))),
 ('133907', (5.0, ('The Tattoo Connection (1978)', 'Action'))),
 ('136826', (5.0, ('The Defiant Ones (1986)', 'Drama')))]

In [36]:
augmentedRatings.takeOrdered(10, key = lambda x : x[1][0])

[('177121', (0.5, ('Goliath (2017)', '(no genres listed)'))),
 ('155707', (0.5, ('Wrecker (2015)', 'Action|Horror|Thriller'))),
 ('171883', (0.5, ('Baptism of Blood (2006)', 'Drama|Thriller'))),
 ('103614', (0.5, ('Topralli (1966)', 'Comedy'))),
 ('169644',
  (0.5,
   ('"Al Murray, The Pub Landlord - Glass of White Wine for the Lady (2004)"',
    'Comedy'))),
 ('166667', (0.5, ('I Can Make You Love Me (1993)', 'Thriller'))),
 ('176427',
  (0.5, ('La Petite mort 2 : Nasty Tapes (2014)', 'Horror|Thriller'))),
 ('124111',
  (0.5, ('Wish You Were Dead (2002)', 'Action|Comedy|Romance|Thriller'))),
 ('112544', (0.5, ('Blue (2003)', 'Action|Drama|War'))),
 ('133407', (0.5, ('Dirty Business (2009)', 'Documentary')))]

### Challenge:

Make appropriate changes so that only movies with average ratings higher than 3.75 and number of ratings totalling at least 1000 are collected.

In [39]:
atleast1000 = mapValuesToListRatings.filter(lambda x: len(x[1])>=1000)

In [42]:
avgRatingschallenge = atleast1000.mapValues(lambda V: sum(V) / float(len(V)))
print(type(avgRatingschallenge))

<class 'pyspark.rdd.PipelinedRDD'>


In [43]:
avgRatingschallenge.take(5)

[('81834', 3.8795802205620777),
 ('69951', 3.4099450045829514),
 ('1371', 3.091344919099392),
 ('5446', 3.8383345836459113),
 ('64622', 3.7822109770338654)]

In [45]:
higherthan375 = avgRatingschallenge.filter(lambda x: x[1]>3.75)

In [46]:
augmentedRatingsChallenge = higherthan375.join(movieInfo)

In [48]:
augmentedRatingsChallenge.take(1000)

[('3916', (3.7803878062897938, ('Remember the Titans (2000)', 'Drama'))),
 ('9010',
  (3.858695652173913,
   ("Love Me If You Dare (Jeux d'enfants) (2003)", 'Drama|Romance'))),
 ('7084',
  (3.853517364203028, ('"Play It Again, Sam (1972)"', 'Comedy|Romance'))),
 ('3037', (3.96530187369882, ('Little Big Man (1970)', 'Western'))),
 ('950', (4.150782937365011, ('"Thin Man, The (1934)"', 'Comedy|Crime'))),
 ('3467', (3.9490219324244222, ('Hud (1963)', 'Drama|Western'))),
 ('2068',
  (3.9797551789077215,
   ('Fanny and Alexander (Fanny och Alexander) (1982)',
    'Drama|Fantasy|Mystery'))),
 ('1674', (3.8286838619184915, ('Witness (1985)', 'Drama|Romance|Thriller'))),
 ('168250', (3.9515629036424698, ('Get Out (2017)', 'Horror'))),
 ('6857',
  (3.7753998118532457,
   ('Ninja Scroll (Jûbei ninpûchô) (1995)',
    'Action|Adventure|Animation|Fantasy'))),
 ('1252',
  (4.1524313561098305,
   ('Chinatown (1974)', 'Crime|Film-Noir|Mystery|Thriller'))),
 ('140174', (3.9798953456348114, ('Room (2015

### Top20 Action

In [85]:
import string

avgRatingschallenge.take(5)
#joined = avgRatingschallenge.join(movieInfo)

[('81834', 3.8795802205620777),
 ('2439', 3.5632984901277585),
 ('6538', 3.4004237288135593),
 ('69951', 3.4099450045829514),
 ('4367', 2.767811579980373)]

In [81]:
avg
filteredbyAction = joined.filter(lambda x: 'action' in x[1][1][1].lower())

In [82]:
filteredbyAction.takeOrdered(20 , key = lambda x: -x[1][0])

[('2019',
  (4.2541157909178215,
   ('Seven Samurai (Shichinin no samurai) (1954)', 'Action|Adventure|Drama'))),
 ('2959',
  (4.230663235786717, ('Fight Club (1999)', 'Action|Crime|Drama|Thriller'))),
 ('908',
  (4.201091113037271,
   ('North by Northwest (1959)',
    'Action|Adventure|Mystery|Romance|Thriller'))),
 ('6016',
  (4.184896558122275,
   ('City of God (Cidade de Deus) (2002)',
    'Action|Adventure|Crime|Drama|Thriller'))),
 ('3030', (4.179297597042514, ('Yojimbo (1961)', 'Action|Adventure'))),
 ('58559',
  (4.173755615654545,
   ('"Dark Knight, The (2008)"', 'Action|Crime|Drama|IMAX'))),
 ('79132',
  (4.1629897528631705,
   ('Inception (2010)', 'Action|Crime|Drama|Mystery|Sci-Fi|Thriller|IMAX'))),
 ('2571',
  (4.149695428470046, ('"Matrix, The (1999)"', 'Action|Sci-Fi|Thriller'))),
 ('1196',
  (4.133481206891313,
   ('Star Wars: Episode V - The Empire Strikes Back (1980)',
    'Action|Adventure|Sci-Fi'))),
 ('1197',
  (4.124807593637763,
   ('"Princess Bride, The (1987)"',