In [2]:
!pip install mrjob nltk

Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
     ---------------------------------------- 1.5/1.5 MB 530.6 kB/s eta 0:00:00
Collecting regex>=2021.8.3
  Downloading regex-2022.10.31-cp311-cp311-win_amd64.whl (267 kB)
     ------------------------------------ 267.7/267.7 kB 748.4 kB/s eta 0:00:00
Installing collected packages: regex, nltk
Successfully installed nltk-3.8.1 regex-2022.10.31


# Assignment on Map-Reduce

In the following questions, you will solve real problems with the techniques you have learned before. You will be working with data of **google play dataset** which includes the following datasets: 
1. googleplaystore
2. googleplaystore review

*You can find the data files on the archive have attached to this exercise.*

**T** (10pts) Number of applications according to the version of Android(output must be sorted)


> result: 
`<version, count>`

In [85]:
%%file version_count.py
from mrjob.job import MRJob,MRStep
from mrjob.protocol import TextValueProtocol
class MRVersionCount(MRJob):
    INPUT_PROTOCOL=TextValueProtocol
    def steps(self):
        return [
            MRStep(mapper=self.mapper_count_version,
                   combiner=self.combiner_count_version,
                   reducer=self.reducer_count_version),
            ]
    def mapper_count_version(self, _, line):
        version=line.strip().split('∑')[-1]
        if version not in ['Android Ver']:
            yield version,1

    def combiner_count_version(self, version, counts):
        yield (version, sum(counts))

    def reducer_count_version(self, version, counts):
        yield version, sum(counts)
        pass

if __name__=='__main__':
    MRVersionCount.run()

Overwriting version_count.py


#### Important note
When using TextValueProtocol, each line behave as utf-8 encoded str.

In [86]:
!python version_count.py googleplaystore.txt

"1.0 and up"	2
"1.5 and up"	20
"1.6 and up"	116
"2.0 and up"	32
"2.0.1 and up"	7
"2.1 and up"	134
"2.2 - 7.1.1"	1
"2.2 and up"	244
"2.3 and up"	652
"2.3.3 and up"	281
"3.0 and up"	241
"3.1 and up"	10
"3.2 and up"	36
"4.0 and up"	1375
"4.0.3 - 7.1.1"	2
"4.0.3 and up"	1501
"4.1 - 7.1.1"	1
"4.1 and up"	2451
"4.2 and up"	394
"4.3 and up"	243
"4.4 and up"	980
"4.4W and up"	12
"5.0 - 6.0"	1
"5.0 - 7.1.1"	1
"5.0 - 8.0"	2
"5.0 and up"	600
"5.1 and up"	24
"6.0 and up"	60
"7.0 - 7.1.1"	1
"7.0 and up"	42
"7.1 and up"	3
"8.0 and up"	6
"NaN"	2
"Varies with device"	1362


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\version_count.Mohammadreza.20230109.191151.402070
Running step 1 of 1...
job output is in C:\Users\MOHAMM~1\AppData\Local\Temp\version_count.Mohammadreza.20230109.191151.402070\output
Streaming final output from C:\Users\MOHAMM~1\AppData\Local\Temp\version_count.Mohammadreza.20230109.191151.402070\output...
Removing temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\version_count.Mohammadreza.20230109.191151.402070...


In [115]:
%%file version_count_optimized.py
from mrjob.job import MRJob,MRStep
from mrjob.protocol import TextValueProtocol
from mrjob import protocol

class MRVersionCount(MRJob):
    INPUT_PROTOCOL=TextValueProtocol
    def configure_args(self):
        super(MRVersionCount, self).configure_args()
        self.add_passthru_arg(
        '--ignore-words', 
        type=str, 
        default='', 
        help='how many lines skipped from the first of input file')

    def steps(self):
        return [
            MRStep(
                mapper_init=self.mapper_skip_lines,
                mapper=self.mapper_count_version,
                combiner=self.combiner_count_version,
                reducer=self.reducer_count_version),
            ]
    def mapper_skip_lines(self):
        self.ignore_words = self\
            .options\
                .ignore_words\
                    .strip()\
                        .split(',')
    def mapper_count_version(self, _, line):
        version=line.strip().split('∑')[-1]
        if version not in self.ignore_words:
            yield version,1

    def combiner_count_version(self, version, counts):
        yield (version, sum(counts))

    def reducer_count_version(self, version, counts):
        yield version, sum(counts)
        pass

if __name__=='__main__':
    MRVersionCount.run()

Overwriting version_count_optimized.py


In [106]:
!python version_count_optimized.py \
    googleplaystore.txt \
    --ignore-words="Android Ver,NaN"

"1.0 and up"	2
"1.5 and up"	20
"1.6 and up"	116
"2.0 and up"	32
"2.0.1 and up"	7
"2.1 and up"	134
"2.2 - 7.1.1"	1
"2.2 and up"	244
"2.3 and up"	652
"2.3.3 and up"	281
"3.0 and up"	241
"3.1 and up"	10
"3.2 and up"	36
"4.0 and up"	1375
"4.0.3 - 7.1.1"	2
"4.0.3 and up"	1501
"4.1 - 7.1.1"	1
"4.1 and up"	2451
"4.2 and up"	394
"4.3 and up"	243
"4.4 and up"	980
"4.4W and up"	12
"5.0 - 6.0"	1
"5.0 - 7.1.1"	1
"5.0 - 8.0"	2
"5.0 and up"	600
"5.1 and up"	24
"6.0 and up"	60
"7.0 - 7.1.1"	1
"7.0 and up"	42
"7.1 and up"	3
"8.0 and up"	6
"Varies with device"	1362


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\version_count_optimized.Mohammadreza.20230109.192430.052762
Running step 1 of 1...
job output is in C:\Users\MOHAMM~1\AppData\Local\Temp\version_count_optimized.Mohammadreza.20230109.192430.052762\output
Streaming final output from C:\Users\MOHAMM~1\AppData\Local\Temp\version_count_optimized.Mohammadreza.20230109.192430.052762\output...
Removing temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\version_count_optimized.Mohammadreza.20230109.192430.052762...


#### Mrjob Test Case

In [23]:
from unittest import TestCase
import unittest
from version_count_optimized import MRVersionCount

class MRVersionCountTestCase(TestCase):
    def test_mapper(self):
        j = MRVersionCount([])
        j.mapper_skip_lines()
        self.assertEqual(next(j.mapper_count_version(None,'NaN')),('NaN',1))

    def test_mapper_ignore_words(self):
        j = MRVersionCount(['--ignore-words=Android Ver,NaN'])
        j.mapper_skip_lines()
        with self.assertRaises(StopIteration):
            next(j.mapper_count_version(None, "NaN"))

if __name__ == "__main__":
    suite = unittest.defaultTestLoader.loadTestsFromTestCase(MRVersionCountTestCase)
    unittest.TextTestRunner().run(suite)

..
----------------------------------------------------------------------
Ran 2 tests in 0.008s

OK


**T** (10pts) K of the best applications in every category(K should be specified by user)


> result:
`<appname,{other fields} >`

In [114]:
%%file k_best_app.py
from mrjob.job import MRJob,MRStep
from mrjob.protocol import TextValueProtocol
from mrjob import protocol

class MRKBestApp(MRJob):
    INPUT_PROTOCOL=TextValueProtocol
    def steps(self):
        return [
            MRStep(mapper=self.preprocess),
            MRStep(mapper=self.mapper,
                reducer=self.reducer),
            MRStep(
                mapper=self.mapper_k_best,
                reducer=self.reducer_k_best
                )
        ]
    def configure_args(self):
        super(MRKBestApp, self).configure_args()
        self.add_passthru_arg(
        '--k-best', 
        type=int, 
        default=1, 
        help='top k best app based on review')

    
    def preprocess(self,_,line):
        items=line.strip().split('∑')
        app=items[0]
        rating=items[2]
        try:
            rating=int(rating)
        except :
            rating=0

        extended_items=items+[rating]
        yield app,extended_items

    def mapper(self, app, extended_items):
        
        category=extended_items[1]
        yield category,extended_items


    def reducer(self, category,items):
        yield category, sorted(items,key=lambda x: x[-1],reverse=True)[:self.options.k_best]
    
    def mapper_k_best(self,category,items):
        for item in items:
            yield item[0],item
    def reducer_k_best(self ,app,items):
        yield app,next(items)
if __name__=='__main__':
    MRKBestApp.run()

Overwriting k_best_app.py


In [116]:
!python k_best_app.py \
    googleplaystore.txt \
    --k-best=3

"A&E - Watch Full Episodes of TV Shows"	["A&E - Watch Full Episodes of TV Shows", "ENTERTAINMENT", "4", "29706", "19M", "1,000,000+", "Free", "0", "Teen", "Entertainment", "16-Jul-18", "3.1.4", "4.4 and up", 4]
"A-Y Collection"	["A-Y Collection", "SHOPPING", "5", "2", "2.9M", "100+", "Free", "0", "Teen", "Shopping", "16-Feb-18", "1.2", "4.1 and up", 5]
"ADS-B Driver"	["ADS-B Driver", "TOOLS", "5", "2", "6.3M", "100+", "Paid", "\"$1.99 \"", "Everyone", "Tools", "15-May-18", "1.19", "4.4 and up", 5]
"AI Today : Artificial Intelligence News & AI 101"	["AI Today : Artificial Intelligence News & AI 101", "NEWS_AND_MAGAZINES", "5", "43", "2.3M", "100+", "Free", "0", "Everyone", "News & Magazines", "22-Jun-18", "1", "4.4 and up", 5]
"AJ Cam"	["AJ Cam", "PHOTOGRAPHY", "5", "44", "2.8M", "100+", "Free", "0", "Everyone", "Photography", "23-Jan-18", "1", "4.1 and up", 5]
"AJ Gray Dark Icon Pack"	["AJ Gray Dark Icon Pack", "PERSONALIZATION", "5", "2", "35M", "10+", "Paid", "\"$0.99 \"", "Everyone"

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\k_best_app.Mohammadreza.20230109.211433.518042
Running step 1 of 3...
Running step 2 of 3...
Running step 3 of 3...
job output is in C:\Users\MOHAMM~1\AppData\Local\Temp\k_best_app.Mohammadreza.20230109.211433.518042\output
Streaming final output from C:\Users\MOHAMM~1\AppData\Local\Temp\k_best_app.Mohammadreza.20230109.211433.518042\output...
Removing temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\k_best_app.Mohammadreza.20230109.211433.518042...


**T** (20pts) Number of applications in every category according to version of Android(output must be sorted on Count)

> result:
`<category, {count, version} >`

In [126]:
%%file category_version_count.py
from mrjob.job import MRJob,MRStep
from mrjob.protocol import TextValueProtocol
from mrjob import protocol

class MRCategoryVersionCount(MRJob):
    INPUT_PROTOCOL=TextValueProtocol
    def steps(self):
        return [
            # MRStep(mapper=self.preprocess),
            MRStep(mapper=self.mapper,
                reducer=self.reducer),
            MRStep(
                mapper=self.mapper_category_version,
                reducer=self.reducer_category_version
                )
        ]

    def mapper(self, app, line):
        items=line.strip().split('∑')
        category=items[1]
        version=items[-1]
        yield (category,version),1


    def reducer(self, key,value):
        yield key, sum(value)
    
    def mapper_category_version(self,key,count):
        yield key[0],(count,key[1])
    def reducer_category_version(self ,category,versions):
        yield category,sorted(versions,reverse=True)
if __name__=='__main__':
    MRCategoryVersionCount.run()

Overwriting category_version_count.py


In [127]:
!python category_version_count.py \
    googleplaystore.txt 

"ART_AND_DESIGN"	[[21, "4.1 and up"], [16, "4.0.3 and up"], [8, "2.3 and up"], [7, "4.0 and up"], [4, "4.2 and up"], [3, "4.4 and up"], [2, "Varies with device"], [2, "3.0 and up"], [1, "5.0 and up"], [1, "2.3.3 and up"]]
"AUTO_AND_VEHICLES"	[[15, "4.1 and up"], [15, "4.0 and up"], [12, "4.0.3 and up"], [8, "4.4 and up"], [7, "Varies with device"], [7, "4.2 and up"], [6, "5.0 and up"], [4, "6.0 and up"], [3, "2.3.3 and up"], [3, "2.3 and up"], [2, "4.3 and up"], [1, "4.4W and up"], [1, "3.2 and up"], [1, "2.2 and up"]]
"BEAUTY"	[[18, "4.0.3 and up"], [11, "4.0 and up"], [9, "4.1 and up"], [4, "5.0 and up"], [3, "Varies with device"], [3, "3.0 and up"], [1, "4.4 and up"], [1, "4.2 and up"], [1, "2.3 and up"], [1, "2.2 and up"], [1, "1.6 and up"]]
"BOOKS_AND_REFERENCE"	[[43, "4.0 and up"], [34, "4.1 and up"], [31, "Varies with device"], [30, "4.0.3 and up"], [18, "2.3 and up"], [17, "1.6 and up"], [12, "4.4 and up"], [12, "3.0 and up"], [10, "2.2 and up"], [6, "4.2 and up"], [6, "2.1 and

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\category_version_count.Mohammadreza.20230109.212820.546214
Running step 1 of 2...
Running step 2 of 2...
job output is in C:\Users\MOHAMM~1\AppData\Local\Temp\category_version_count.Mohammadreza.20230109.212820.546214\output
Streaming final output from C:\Users\MOHAMM~1\AppData\Local\Temp\category_version_count.Mohammadreza.20230109.212820.546214\output...
Removing temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\category_version_count.Mohammadreza.20230109.212820.546214...


**T** (60pts) In the review dataset which words have more occurrence in every application(output must be sorted on Count)

> result: 
`<appname, {count, word1, word2} >`

`hint:` use secondary sort

In [144]:
%%file review_word_occurrence.py
from mrjob.job import MRJob,MRStep
from mrjob.protocol import TextValueProtocol
from mrjob import protocol

class MRReviewWordOccurrence(MRJob):
    INPUT_PROTOCOL=TextValueProtocol
    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                reducer=self.reducer),
            MRStep(
                mapper=self.mapper_app_word,
                reducer=self.reducer_app_word
                ),
            MRStep(
                reducer=self.reducer_second_sort
                ),
            MRStep(
                mapper=self.mapper_second_sort
            ),
        ]

    def preprocess(self,_,line):
        items=line.strip().split('∑')
        app=items[0]
        review=items[1]
        if review.lower() != 'nan':
            tokens=review.split()
            for i in range(len(tokens)-2):
                yield (app,tokens[i],
                        tokens[i+1]),1
    def mapper(self, _, line):
        items=line.strip().split('∑')
        app=items[0]
        review=items[1]
        if review.lower() != 'nan':
            tokens=review.split()
            for i in range(len(tokens)-2):
                yield (app,tokens[i],
                        tokens[i+1]),1

    def reducer(self, key,value):
        yield key, sum(value)  

    def mapper_app_word(self,key,count):
        yield key[0],(count,key[1],key[2])
    def reducer_app_word(self ,app,word_count):
        yield None,(app,max(word_count,key=lambda x:x[0]))
    
    
    def reducer_second_sort(self,_,items):
        yield None,sorted(items,key=lambda x:x[1][0],reverse=True)
        
    
    def mapper_second_sort(self,_,items):
        for item in items:
            yield item[0],item[1]


if __name__=='__main__':
    MRReviewWordOccurrence.run()

Overwriting review_word_occurrence.py


In [145]:
!python review_word_occurrence.py \
    googleplaystore_user_reviews.txt 

"Helix Jump"	[53, "I", "paid"]
"Gyft - Mobile Gift Card Wallet"	[45, "gift", "card"]
"Candy Crush Saga"	[42, "I", "like"]
"Google Photos"	[41, "I", "want"]
"8fit Workouts & Meal Planner"	[35, "I", "love"]
"Duolingo: Learn Languages Free"	[32, "I", "think"]
"Angry Birds Classic"	[31, "I", "get"]
"Crunchyroll - Everything Anime"	[30, "I", "love"]
"Bowmasters"	[29, "I", "like"]
"ColorNote Notepad Notes"	[24, "I", "love"]
"DC Super Hero Girls\u2122"	[24, "I", "love"]
"DRAGON BALL LEGENDS"	[24, "The", "game"]
"Farm Heroes Saga"	[22, "gold", "bars"]
"Flow Free"	[22, "I", "love"]
"Clash Royale"	[21, "I", "think"]
"Fishdom"	[21, "I", "love"]
"CBS Sports App - Scores, News, Stats & Watch Live"	[20, "I", "want"]
"Dr. Panda & Toto's Treehouse"	[20, "I", "love"]
"Dude Perfect 2"	[20, "I", "love"]
"8 Ball Pool"	[19, "I", "tried"]
"Diary with lock"	[19, "I", "love"]
"Facebook"	[19, "I", "can't"]
"Gardenscapes"	[19, "I", "love"]
"Home Street \u2013 Home Design Game"	[19, "I", "love"]
"21-Day Meditati

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence.Mohammadreza.20230109.220420.454142
Running step 1 of 4...
Running step 2 of 4...
Running step 3 of 4...
Running step 4 of 4...
job output is in C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence.Mohammadreza.20230109.220420.454142\output
Streaming final output from C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence.Mohammadreza.20230109.220420.454142\output...
Removing temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence.Mohammadreza.20230109.220420.454142...


In [7]:
%%file review_word_occurrence_refactored.py
from mrjob.job import MRJob,MRStep
from mrjob.protocol import TextValueProtocol
from mrjob import protocol
from nltk.tokenize import word_tokenize
class MRReviewWordOccurrenceRefactored(MRJob):
    INPUT_PROTOCOL=TextValueProtocol
    def steps(self):
        return [
            MRStep(mapper=self.preprocess,
                reducer=self.reducer),
            MRStep(
                mapper=self.mapper_app_word,
                reducer=self.reducer_app_word
                ),
            MRStep(
                reducer=self.reducer_second_sort
                ),
        ]

    def preprocess(self,_,line):
        items=line.strip().split('∑')
        app=items[0]
        review=items[1]
        if review.lower() != 'nan':
            tokens=[x for 
                    x in word_tokenize(review) 
                    if x.isalpha()]
            for i in range(len(tokens)-2):
                yield (app,tokens[i],
                        tokens[i+1]),1
    

    def reducer(self, key,value):
        yield key, sum(value)  

    def mapper_app_word(self,key,count):
        yield key[0],(count,key[1],key[2])
    def reducer_app_word(self ,app,word_count):
        yield None,(app,max(word_count,key=lambda x:x[0]))
    
    
    def reducer_second_sort(self,_,items):
        for item in sorted(items,key=lambda x:x[1][0],reverse=True):
            yield item[0],item[1]
        
    


if __name__=='__main__':
    MRReviewWordOccurrenceRefactored.run()

Overwriting review_word_occurrence_refactored.py


In [8]:
!python review_word_occurrence_refactored.py \
    googleplaystore_user_reviews.txt 

"Helix Jump"	[71, "many", "ads"]
"Gyft - Mobile Gift Card Wallet"	[57, "gift", "card"]
"Google Photos"	[45, "I", "want"]
"Candy Crush Saga"	[42, "I", "like"]
"8fit Workouts & Meal Planner"	[35, "I", "love"]
"Angry Birds Classic"	[34, "time", "I"]
"Crunchyroll - Everything Anime"	[33, "I", "love"]
"Duolingo: Learn Languages Free"	[32, "I", "think"]
"Bowmasters"	[29, "I", "like"]
"8 Ball Pool"	[28, "game", "I"]
"Flow Free"	[26, "I", "love"]
"Farm Heroes Saga"	[25, "gold", "bars"]
"Clash Royale"	[24, "This", "game"]
"ColorNote Notepad Notes"	[24, "I", "love"]
"DC Super Hero Girls\u2122"	[24, "I", "love"]
"DRAGON BALL LEGENDS"	[24, "The", "game"]
"Dr. Panda & Toto's Treehouse"	[24, "I", "love"]
"Calorie Counter - Macros"	[22, "tracking", "macros"]
"Candy Crush Soda Saga"	[22, "game", "I"]
"Alto's Adventure"	[21, "game", "I"]
"Fishdom"	[21, "I", "love"]
"Garena Free Fire"	[21, "new", "update"]
"Hello Kitty Nail Salon"	[21, "many", "ads"]
"Battlelands Royale"	[20, "battle", "pass"]
"CBS Spor

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence_refactored.Mohammadreza.20230110.100033.892528
Running step 1 of 3...
Running step 2 of 3...
Running step 3 of 3...
job output is in C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence_refactored.Mohammadreza.20230110.100033.892528\output
Streaming final output from C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence_refactored.Mohammadreza.20230110.100033.892528\output...
Removing temp directory C:\Users\MOHAMM~1\AppData\Local\Temp\review_word_occurrence_refactored.Mohammadreza.20230110.100033.892528...
