**Perform count of movie ratings from file.**

**Source file name:** u.data

**First 10 lines of source file:**<br>
196	242	3	881250949<br>
186	302	3	891717742<br>
22	377	1	878887116<br>
244	51	2	880606923<br>
166	346	1	886397596<br>
298	474	4	884182806<br>
115	265	2	881171488<br>
253	465	5	891628467<br>
305	451	3	886324817<br>
6	86	3	883603013<br>

**Spin up Spark cluster.**

In [1]:
# Import packages.
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
import os
import json
import collections

# Launch spark cluster. Restart cluster, if it is already started.
try:
    sc
    sc.stop()
except NameError:
    pass
finally:
    print('Spinning up Spark cluster ...')
    conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
    sc = SparkContext(conf = conf)

# Display cluster information.
sc

Spinning up Spark cluster ...


Get working directory. Get configurations from configuration file. Using configurations, set data file location.

In [4]:
# Get current working directory.
current_working_directory = os.getcwd()

# Load configuration file.
with open(current_working_directory + '\configuration.json', 'r') as configuration_file:
    dict_configurations = json.load(configuration_file)

# Get path part for u.data file from configuration file.
u_data_path_part = dict_configurations['u.data_path_part']

# Get current working directory's parent.
current_working_directory_parent = os.path.dirname(current_working_directory)

# Get full path for u.data file.
u_data_path = os.path.abspath(os.path.join(current_working_directory_parent, u_data_path_part))

In [17]:
# Read in data file into lines RDD.
lines = sc.textFile(u_data_path)

# Print count of lines RDD.
print(lines.count())

# Display first 10 rows of lines RDD.
lines.take(10)

100000


['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013']

In [18]:
# Map third column of each line into ratings RDD.
ratings = lines.map(lambda x: x.split()[2])

# Print count of ratings RDD.
print(ratings.count())

# Display first 10 entries of ratings RDD.
ratings.take(10)

100000


['3', '3', '1', '2', '1', '4', '2', '5', '3', '3']

In [19]:
# countByValue: Return the count of each unique value in an RDD as a map of (value, count) pairs.
result = ratings.countByValue()

# Print type of result object.
print(type(result))

# Display result object.
result

<class 'collections.defaultdict'>


defaultdict(int, {'1': 6110, '2': 11370, '3': 27145, '4': 34174, '5': 21201})

In [20]:
# collections.OrderDict: dict subclass that remembers the order entries were added.
sortedResults = collections.OrderedDict(sorted(result.items()))

# Print type of sortedResults object.
print(type(sortedResults))

# Display sortedResults object.
sortedResults

<class 'collections.OrderedDict'>


OrderedDict([('1', 6110),
             ('2', 11370),
             ('3', 27145),
             ('4', 34174),
             ('5', 21201)])

In [21]:
# Print content of sortedResults OrderDict.
for key, value in sortedResults.items():
    print("%s %i" % (key, value))

1 6110
2 11370
3 27145
4 34174
5 21201
