# MapReduce on Movies
Given a dataset named IMDB_Movies.csv, which is available on Canvas, use the Spark Resilient Distributed Dataset (RDD) model to count the number of times a country is involved in a movie. Your output will be key-pairs represented
as [country, text-based bar-chart (count)] and sorted by bar-length. You may create the text-based bar-chart
using an extended ASCII character such as the block character (ASCII character 219. See more ASCII characters here:
https://theasciicode.com.ar/). For scaling, let 1 ASCII character = 1000 movies. Make sure to collect and
display your results in your notebook using the rdd.collect() method.

RDD functions Allowed:

• sc.textFile()

• map()

• flatMap

• reduce()

• reduceByKey

• sortBy()

• groupBy()

In [1]:
from pyspark.sql import SparkSession
import re

In [2]:
spark = SparkSession.builder.master('local[*]') \
                         .appName('CountryCount') \
                         .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/22 22:50:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc = spark.sparkContext.textFile("IMDb_movies.csv")


In [4]:
print(sc)

IMDb_movies.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


In [5]:
print("Count : "+str(sc.count()))

[Stage 0:>                                                          (0 + 2) / 2]

Count : 85856


                                                                                

In [6]:
sc.take(5)

['imdb_title_id,title,original_title,year,date_published,genre,duration,country,language,director,writer,production_company,actors,description,avg_vote,votes,budget,usa_gross_income,worlwide_gross_income,metascore,reviews_from_users,reviews_from_critics',
 'tt0000009,Miss Jerry,Miss Jerry,1894,1894-10-09,Romance,45,USA,None,Alexander Black,Alexander Black,Alexander Black Photoplays,"Blanche Bayliss, William Courtenay, Chauncey Depew",The adventures of a female reporter in the 1890s.,5.9,154,,,,,1.0,2.0',
 'tt0000574,The Story of the Kelly Gang,The Story of the Kelly Gang,1906,1906-12-26,"Biography, Crime, Drama",70,Australia,None,Charles Tait,Charles Tait,J. and N. Tait,"Elizabeth Tait, John Tait, Norman Campbell, Bella Cola, Will Coyne, Sam Crewes, Jack Ennis, John Forde, Vera Linden, Mr. Marshall, Mr. McKenzie, Frank Mills, Ollie Wilson",True story of notorious Australian outlaw Ned Kelly (1855-80).,6.1,589,$ 2250,,,,7.0,7.0',
 'tt0001892,Den sorte drøm,Den sorte drøm,1911,1911-08-19

In [7]:
def parse(text):
    ### Parse values and return a list
    parsedText = []
    multipleValues = []
    collect = False
    
    for word in text:
        
        if not word:
            parsedText.append([word])
            
        elif word[0] == '"' and not collect:
            collect = True
            multipleValues.append(word[1:])
            
        elif word[-1] == '"':
            collect = False
            multipleValues.append(word[:-1])
            parsedText.append(multipleValues.copy())
            multipleValues.clear()

        elif collect:
            multipleValues.append(word)
        else:
            parsedText.append([word])
            
        # print(word)
            
    return parsedText
    
    

In [8]:
lines = sc.map(lambda line: parse(line.split(','))[7])

In [9]:
lines.take(10)

[['country'],
 ['USA'],
 ['Australia'],
 ['Germany', ' Denmark'],
 ['USA'],
 ['Italy'],
 ['USA'],
 ['Germany'],
 ['Italy'],
 ['Romania']]

In [10]:
flatten = lines.flatMap(lambda text: [country.strip(' ') for country in text])

In [11]:
flatten.take(10)

['country',
 'USA',
 'Australia',
 'Germany',
 'Denmark',
 'USA',
 'Italy',
 'USA',
 'Germany',
 'Italy']

In [12]:
count = flatten.map(lambda country: (country,1)) \
              .reduceByKey(lambda a,b: a + b)

In [13]:
count.take(10)

                                                                                

[('country', 1),
 ('USA', 34325),
 ('Australia', 1181),
 ('Denmark', 1026),
 ('France', 8311),
 ('Belgium', 1354),
 ('Hungary', 642),
 ('Mexico', 1173),
 ('Norway', 639),
 ('Austria', 448)]

In [14]:
def setBars(value):
    # '█' = 9608 unicode
    bars = value // 1000
    return bars * chr(9608)

def formatNumbers(value):
    value /= 1000
    return '%.2fk' % value
    

In [15]:
bars = count.map(lambda x: (x[0], setBars(x[1]), formatNumbers(x[1])))

In [16]:
bars.take(10)

[('country', '', '0.00k'),
 ('USA', '██████████████████████████████████', '34.33k'),
 ('Australia', '█', '1.18k'),
 ('Denmark', '█', '1.03k'),
 ('France', '████████', '8.31k'),
 ('Belgium', '█', '1.35k'),
 ('Hungary', '', '0.64k'),
 ('Mexico', '█', '1.17k'),
 ('Norway', '', '0.64k'),
 ('Austria', '', '0.45k')]

In [17]:
sortedOutput = bars.sortBy(lambda x: len(x[1]), ascending = False)

In [18]:
sortedOutput.take(10)

[('USA', '██████████████████████████████████', '34.33k'),
 ('France', '████████', '8.31k'),
 ('UK', '███████', '7.49k'),
 ('India', '██████', '6.37k'),
 ('Italy', '█████', '5.06k'),
 ('Germany', '███', '3.72k'),
 ('Canada', '███', '3.62k'),
 ('Japan', '███', '3.70k'),
 ('Spain', '██', '2.73k'),
 ('Australia', '█', '1.18k')]

In [19]:
sortedOutput.saveAsTextFile('CountryCount.txt')

In [20]:
sortedOutput.collect()

[('USA', '██████████████████████████████████', '34.33k'),
 ('France', '████████', '8.31k'),
 ('UK', '███████', '7.49k'),
 ('India', '██████', '6.37k'),
 ('Italy', '█████', '5.06k'),
 ('Germany', '███', '3.72k'),
 ('Canada', '███', '3.62k'),
 ('Japan', '███', '3.70k'),
 ('Spain', '██', '2.73k'),
 ('Australia', '█', '1.18k'),
 ('Denmark', '█', '1.03k'),
 ('Belgium', '█', '1.35k'),
 ('Mexico', '█', '1.17k'),
 ('China', '█', '1.17k'),
 ('Netherlands', '█', '1.03k'),
 ('South Korea', '█', '1.30k'),
 ('Sweden', '█', '1.23k'),
 ('Russia', '█', '1.08k'),
 ('Hong Kong', '█', '1.88k'),
 ('Turkey', '█', '1.55k'),
 ('West Germany', '█', '1.11k'),
 ('country', '', '0.00k'),
 ('Hungary', '', '0.64k'),
 ('Norway', '', '0.64k'),
 ('Austria', '', '0.45k'),
 ('Soviet Union', '', '0.87k'),
 ('Switzerland', '', '0.64k'),
 ('Brazil', '', '0.98k'),
 ('Portugal', '', '0.43k'),
 ('Argentina', '', '0.85k'),
 ('Poland', '', '0.90k'),
 ('Finland', '', '0.68k'),
 ('Greece', '', '0.77k'),
 ('East Germany', '', '0.