# Most Popular Movies using Spark (ID only)

Here I'll show how to use Spark to calculate the frequency of Star Ratings on a Movie database with 100k ratings from different users

In [None]:
from pyspark import SparkConf, SparkContext
import collections
import time

import matplotlib.pyplot as plt

## Setup

In [None]:
conf = SparkConf().setMaster('local').setAppName('PopularMovies')
sc = SparkContext(conf = conf)

## Processing files

In [None]:
start_time = time.time()

In [None]:
lines = sc.textFile('ml-100k/u.data')
movies = lines.map(lambda x: (int(x.split()[1]),1))

In [None]:
movies.collect()[:10]

In [None]:
movieCounts = movies.reduceByKey(lambda x,y: x+y)

In [None]:
movieCounts.collect()[:10]

In [None]:
flipped = movieCounts.map(lambda x: (x[1],x[0]))

In [None]:
flipped.collect()[:10]

In [None]:
sortedMovies = flipped.sortByKey(ascending=False)

In [None]:
sortedMovies.collect()[:10]

In [None]:
end_time = time.time()
hours, rem = divmod(end_time-start_time, 3600)
minutes, seconds = divmod(rem, 60)
print("{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))

---

# Most Popular Movies using Spark (with Movie name)

In [1]:
from pyspark import SparkConf, SparkContext
import collections
import time

import matplotlib.pyplot as plt

## Setup

In [2]:
def loadMovieNames():
    movieNames = {}
    with open('ml-100k/u.ITEM', encoding="ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
        return movieNames

In [3]:
conf = SparkConf().setMaster('local').setAppName('PopularMovies')
sc = SparkContext(conf = conf)

In [4]:
nameDict = sc.broadcast(loadMovieNames())

## Processing files

In [5]:
start_time = time.time()

In [6]:
lines = sc.textFile('ml-100k/u.data')
movies = lines.map(lambda x: (int(x.split()[1]),1))

In [7]:
movies.collect()[:10]

[(242, 1),
 (302, 1),
 (377, 1),
 (51, 1),
 (346, 1),
 (474, 1),
 (265, 1),
 (465, 1),
 (451, 1),
 (86, 1)]

In [8]:
movieCounts = movies.reduceByKey(lambda x,y: x+y)

In [9]:
movieCounts.collect()[:10]

[(242, 117),
 (302, 297),
 (377, 13),
 (51, 81),
 (346, 126),
 (474, 194),
 (265, 227),
 (465, 85),
 (451, 170),
 (86, 150)]

In [10]:
flipped = movieCounts.map(lambda x: (x[1],x[0]))

In [11]:
flipped.collect()[:10]

[(117, 242),
 (297, 302),
 (13, 377),
 (81, 51),
 (126, 346),
 (194, 474),
 (227, 265),
 (85, 465),
 (170, 451),
 (150, 86)]

In [12]:
sortedMovies = flipped.sortByKey(ascending=False)

In [13]:
sortedMovies.collect()[:10]

[(583, 50),
 (509, 258),
 (508, 100),
 (507, 181),
 (485, 294),
 (481, 286),
 (478, 288),
 (452, 1),
 (431, 300),
 (429, 121)]

In [14]:
sortedMoviesWithNames = sortedMovies.map(lambda x : (nameDict.value[x[1]], x[0]))

In [15]:
sortedMoviesWithNames.take(10)

[('Star Wars (1977)', 583),
 ('Contact (1997)', 509),
 ('Fargo (1996)', 508),
 ('Return of the Jedi (1983)', 507),
 ('Liar Liar (1997)', 485),
 ('English Patient, The (1996)', 481),
 ('Scream (1996)', 478),
 ('Toy Story (1995)', 452),
 ('Air Force One (1997)', 431),
 ('Independence Day (ID4) (1996)', 429)]

In [16]:
end_time = time.time()
hours, rem = divmod(end_time-start_time, 3600)
minutes, seconds = divmod(rem, 60)
print("{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))

00:00:03.08
