In [4]:
%%file RatingCounter.py

from mrjob.job import MRJob

class MRRatingCounter(MRJob):
    def mapper(self, key, line):
        (userID, movieId, rating, timestamp) = line.split('\t')
        yield rating, 1
    def reducer(self, rating, occurences):
        yield rating, sum(occurences)
        
if __name__ == '__main__':
    MRRatingCounter.run()

Writing RatingCounter.py


In [5]:
! python RatingCounter.py ml-100k/u.data

No configs found; falling back on auto-configuration
Creating temp directory /tmp/RatingCounter.prk.20160529.073518.579586
Running step 1 of 1...
Streaming final output from /tmp/RatingCounter.prk.20160529.073518.579586/output...
"1"	6110
"2"	11370
"3"	27145
"4"	34174
"5"	21201
Removing temp directory /tmp/RatingCounter.prk.20160529.073518.579586...


In [7]:
%%file FriendsByAge.py

from mrjob.job import MRJob

class MRFriendsByAge(MRJob):
    def mapper(self, key, line):
        (ID, name, age, numFriends) = line.split(',')
        yield age, float(numFriends)
    def reducer(self, age, numFriends):
        total = 0
        numElements = 0
        for x in numFriends:
            total += x
            numElements += 1
        yield age, total/numElements
            
        
if __name__ == '__main__':
    MRFriendsByAge.run()

Writing FriendsByAge.py


In [9]:
! python FriendsByAge.py fakefriends.csv > friendsbyage.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/FriendsByAge.prk.20160529.090856.546668
Running step 1 of 1...
Streaming final output from /tmp/FriendsByAge.prk.20160529.090856.546668/output...
Removing temp directory /tmp/FriendsByAge.prk.20160529.090856.546668...


In [10]:
%%file MinTemperatures.py

from mrjob.job import MRJob

class MRMinTemperature(MRJob):
    def MakeFahrenheit(self, tenthsOfCelsius):
        celsius = float(tenthsOfCelsius)/10.0
        fahrenheit = celsius * 1.8 + 32.0
        return fahrenheit
    def mapper(self, key, line):
        (location, date, type, data, x, y, z, w) = line.split(',')
        if(type=='TMIN'):
            temperature = self.MakeFahrenheit(data)
            yield location, temperature
            
    def reducer(self, location, temps):
        yield location, min(temps)
        
if __name__ == '__main__':
    MRMinTemperature.run()

Writing MinTemperatures.py


In [11]:
! python MinTemperatures.py 1800.csv > mintemps.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MinTemperatures.prk.20160529.091916.488311
Running step 1 of 1...
Streaming final output from /tmp/MinTemperatures.prk.20160529.091916.488311/output...
Removing temp directory /tmp/MinTemperatures.prk.20160529.091916.488311...


In [12]:
%%file MaxTemperatures.py

from mrjob.job import MRJob

class MRMaxTemperature(MRJob):
    def MakeFahrenheit(self, tenthsOfCelsius):
        celsius = float(tenthsOfCelsius)/10.0
        fahrenheit = celsius * 1.8 + 32.0
        return fahrenheit
    def mapper(self, key, line):
        (location, date, type, data, x, y, z, w) = line.split(',')
        if(type=='TMAX'):
            temperature = self.MakeFahrenheit(data)
            yield location, temperature
            
    def reducer(self, location, temps):
        yield location, max(temps)
        
if __name__ == '__main__':
    MRMaxTemperature.run()

Writing MaxTemperatures.py


In [13]:
! python MaxTemperatures.py 1800.csv > maxtemps.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MaxTemperatures.prk.20160529.092328.866845
Running step 1 of 1...
Streaming final output from /tmp/MaxTemperatures.prk.20160529.092328.866845/output...
Removing temp directory /tmp/MaxTemperatures.prk.20160529.092328.866845...


In [16]:
%%file WordFrequency.py

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, key, line):
        words = line.split()#will split on any whitespaces (tab,space..)
        for word in words:
            word = unicode(word, "utf-8", errors="ignore") #avoids issues in mrjob 5.0
            yield word.lower(), 1
    def reducer(self, key, values):
        yield key, sum(values)
        
if __name__ == '__main__':
    MRWordFrequencyCount.run()

Overwriting WordFrequency.py


In [17]:
!python WordFrequency.py book.txt > wordcount.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/WordFrequency.prk.20160529.093521.721514
Running step 1 of 1...
Streaming final output from /tmp/WordFrequency.prk.20160529.093521.721514/output...
Removing temp directory /tmp/WordFrequency.prk.20160529.093521.721514...


In [18]:
%%file WordFrequencyBetter.py

from mrjob.job import MRJob
import re

WORD_REGEXP = re.compile(r"[\w']+")

class MRWordFrequencyCount(MRJob):
    def mapper(self, key, line):
        words = WORD_REGEXP.findall(line)
        for word in words:
            word = unicode(word, "utf-8", errors="ignore") #avoids issues in mrjob 5.0
            yield word.lower(), 1
    def reducer(self, key, values):
        yield key, sum(values)
        
if __name__ == '__main__':
    MRWordFrequencyCount.run()

Writing WordFrequencyBetter.py


In [19]:
!python WordFrequencyBetter.py book.txt > wordsbetter.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/WordFrequencyBetter.prk.20160529.105145.601793
Running step 1 of 1...
Streaming final output from /tmp/WordFrequencyBetter.prk.20160529.105145.601793/output...
Removing temp directory /tmp/WordFrequencyBetter.prk.20160529.105145.601793...


In [20]:
%%file WordFrequencySorted.py

from mrjob.job import MRJob
from mrjob.job import MRStep

import re

WORD_REGEXP = re.compile(r"[\w']+")

class MRWordFrequencyCount(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                  reducer=self.reducer_count_words),
            MRStep(mapper=self.mapper_make_counts_key,
                  reducer=self.reducer_output_words)
        ]
    def mapper_get_words(self, key, line):
        words = WORD_REGEXP.findall(line)
        for word in words:
            word = unicode(word, "utf-8", errors="ignore") #avoids issues in mrjob 5.0
            yield word.lower(), 1
    def reducer_count_words(self, key, values):
        yield key, sum(values)
    def mapper_make_counts_key(self,word,count):
        yield '%04d'%int(count), word
    def reducer_output_words(self, count, words):
        for word in words:
            yield count, word
            
        
if __name__ == '__main__':
    MRWordFrequencyCount.run()

Writing WordFrequencySorted.py


In [21]:
!python WordFrequencySorted.py book.txt > wordssorted.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/WordFrequencySorted.prk.20160529.120915.009967
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /tmp/WordFrequencySorted.prk.20160529.120915.009967/output...
Removing temp directory /tmp/WordFrequencySorted.prk.20160529.120915.009967...


In [3]:
%%file MoneySpend.py

from mrjob.job import MRJob

class MRMoneyCounter(MRJob):
    def mapper(self, key, line):
        (userID, itemId, amount) = line.split(',')
        yield userID, float(amount)
    def reducer(self, userID, amount):
        yield userID, sum(amount)
        
if __name__ == '__main__':
    MRMoneyCounter.run()

Overwriting MoneySpend.py


In [4]:
!python MoneySpend.py customer-orders.csv > moneyspend.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MoneySpend.prk.20160529.141649.042485
Running step 1 of 1...
Streaming final output from /tmp/MoneySpend.prk.20160529.141649.042485/output...
Removing temp directory /tmp/MoneySpend.prk.20160529.141649.042485...


In [14]:
%%file AmountSpend.py

from mrjob.job import MRJob
from mrjob.job import MRStep

class MRMoneyCounter(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_amount,
                  reducer=self.reducer_total_amount),
            MRStep(mapper=self.mapper_make_amount_key,
                  reducer=self.reducer_output_amount)
        ]
    def mapper_get_amount(self, key, line):
        (userID, itemId, amount) = line.split(',')
        yield userID, float(amount)
    def reducer_total_amount(self, userID, amount):
        yield userID, sum(amount)
    def mapper_make_amount_key(self, userID, orderAmount):
        yield '%04.02f'%float(orderAmount), userID
    def reducer_output_amount(self, orderAmount, userIDs):
        for userID in userIDs:
            yield orderAmount, userID 
        
if __name__ == '__main__':
    MRMoneyCounter.run()

Overwriting AmountSpend.py


In [15]:
!python AmountSpend.py customer-orders.csv > amountspend.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/AmountSpend.prk.20160529.170900.398813
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /tmp/AmountSpend.prk.20160529.170900.398813/output...
Removing temp directory /tmp/AmountSpend.prk.20160529.170900.398813...


In [18]:
%%file WordFrequencyWithCombiner.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWordFrequencyCount(MRJob):
    def mapper(self,key,line):
        words = line.split()
        for word in words:
            word = unicode(word, "utf-8", errors="ignore")
            yield word.lower(),1
            
    def combiner(self,key,values):#may not be called ..depends on hadoops decision
        yield key,sum(values)
        
    def reducer(self,key,values):
        yield key,sum(values)
        
if __name__ == '__main__':
    MRWordFrequencyCount.run()

Overwriting WordFrequencyWithCombiner.py


In [21]:
!python WordFrequencyWithCombiner.py book.txt > wordfrequencyCombiner.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/WordFrequencyWithCombiner.prk.20160529.172509.942843
Running step 1 of 1...
Streaming final output from /tmp/WordFrequencyWithCombiner.prk.20160529.172509.942843/output...
Removing temp directory /tmp/WordFrequencyWithCombiner.prk.20160529.172509.942843...


In [29]:
%%file MostPopularMovie.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MostPopularMovie(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_ratings,
                  reducer=self.reducer_count_ratings),
            MRStep(#mapper=self.mapper_passthrough,
                   reducer=self.reducer_find_max)
        ]
    def mapper_get_ratings(self, key, line):
        (userID, movieId, rating, timestamp) = line.split('\t')
        yield movieId, 1
    def reducer_count_ratings(self, movieId, occurences):
        yield None, (sum(occurences),movieId)
    #def mapper_passthrough(self,key,values):
    #    yield key,values
    def reducer_find_max(self, key, values):
        yield max(values)
        
if __name__ == '__main__':
    MostPopularMovie.run()

Overwriting MostPopularMovie.py


In [30]:
!python MostPopularMovie.py ml-100k/u.data

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MostPopularMovie.prk.20160529.173944.876325
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /tmp/MostPopularMovie.prk.20160529.173944.876325/output...
583	"50"
Removing temp directory /tmp/MostPopularMovie.prk.20160529.173944.876325...


In [37]:
%%file MostPopularMovieNice.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MostPopularMovieNice(MRJob):
    def configure_options(self):
        super(MostPopularMovieNice, self).configure_options()
        self.add_file_option('--items', help='Path to u.item')
        #this file will be passed to every node of the job where it is needed
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_ratings,
                   reducer_init=self.reducer_init,
                   reducer=self.reducer_count_ratings),
            MRStep(#mapper=self.mapper_passthrough,
                   reducer=self.reducer_find_max)
        ]
    def mapper_get_ratings(self, key, line):
        (userID, movieId, rating, timestamp) = line.split('\t')
        yield movieId, 1
    def reducer_init(self):#run before our first reducer
        self.movieNames = {}
        with open("u.item") as f:
            for line in f:
                fields = line.split('|')
                self.movieNames[fields[0]] = fields[1].decode('utf-8','ignore')
    def reducer_count_ratings(self, movieId, occurences):
        yield None, (sum(occurences),self.movieNames[movieId])
    #def mapper_passthrough(self,key,values):
    #    yield key,values
    def reducer_find_max(self, key, values):
        yield max(values)
        
if __name__ == '__main__':
    MostPopularMovieNice.run()

Overwriting MostPopularMovieNice.py


In [38]:
!python MostPopularMovieNice.py --items=ml-100k/u.item ml-100k/u.data

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MostPopularMovieNice.prk.20160529.175358.910264
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /tmp/MostPopularMovieNice.prk.20160529.175358.910264/output...
583	"Star Wars (1977)"
Removing temp directory /tmp/MostPopularMovieNice.prk.20160529.175358.910264...


In [43]:
%%file MostPopularSuperHero.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MostPopularSuperHero(MRJob):
    
    def configure_options(self):
        super(MostPopularSuperHero, self).configure_options()
        self.add_file_option('--names',help='Path to Marvel-names.txt')
    
    def steps(self):
        return [
            MRStep(mapper=self.mapper_count_friends_per_line,
                  reducer=self.reducer_combine_friends),
            MRStep(mapper=self.mapper_prep_for_sort,
                  mapper_init=self.load_name_directory,
                  reducer = self.reducer_find_max_friends)            
        ]
    
    def mapper_count_friends_per_line(self,key,line):
        fields = line.split()
        heroID = fields[0]
        numFriends = len(fields)-1
        yield int(heroID),int(numFriends)
        
    def reducer_combine_friends(self,heroID,friendCounts):
        yield heroID,sum(friendCounts)
        
    def mapper_prep_for_sort(self,heroID,friendCounts):
        heroName = self.heroNames[heroID]
        yield None, (friendCounts,heroName)
        
    def reducer_find_max_friends(self,key,value):
        yield max(value)
    
    def load_name_directory(self):
        self.heroNames = {}
        
        with open("Marvel-names.txt") as f:
            for line in f:
                fields = line.split('"')
                heroID = int(fields[0])
                self.heroNames[heroID] = fields[1].decode('utf-8','ignore')
                

if __name__ == '__main__':
    MostPopularSuperHero.run()

Overwriting MostPopularSuperHero.py


In [44]:
!python MostPopularSuperHero.py --names=Marvel-names.txt Marvel-graph.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MostPopularSuperHero.prk.20160529.185831.785432
Running step 1 of 2...
Running step 2 of 2...
Streaming final output from /tmp/MostPopularSuperHero.prk.20160529.185831.785432/output...
1933	"CAPTAIN AMERICA"
Removing temp directory /tmp/MostPopularSuperHero.prk.20160529.185831.785432...


In [45]:
%%file ProcessMarvel.py

# Call this with one argument: the character ID you are starting from.
# For example, Spider Man is 5306, The Hulk is 2548. Refer to Marvel-names.txt
# for others.

import sys

print 'Creating BFS starting input for character ' + sys.argv[1]

with open("BFS-iteration-0.txt", 'w') as out:

    with open("Marvel-graph.txt") as f:

        for line in f:
            fields = line.split()
            heroID = fields[0]
            numConnections = len(fields) - 1
            connections = fields[-numConnections:]

            color = 'WHITE'
            distance = 9999

            if (heroID == sys.argv[1]) :
                color = 'GRAY'
                distance = 0

            if (heroID != ''):
                edges = ','.join(connections)
                outStr = '|'.join((heroID, edges, str(distance), color))
                out.write(outStr)
                out.write("\n")


    f.close()

out.close()

Writing ProcessMarvel.py


In [46]:
!python ProcessMarvel.py 2548

Creating BFS starting input for character 2548


In [1]:
%%file BFSIteration.py

from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol

class Node:
    def __init__(self):
        self.characterID = ''
        self.connections = []
        self.distance = 9999
        self.color = 'WHITE'

    #Format is ID|EDGES|DISTANCE|COLOR
    def fromLine(self, line):
        fields = line.split('|')
        if (len(fields) == 4):
            self.characterID = fields[0]
            self.connections = fields[1].split(',')
            self.distance = int(fields[2])
            self.color = fields[3]

    def getLine(self):
        connections = ','.join(self.connections)
        return '|'.join( (self.characterID, connections, str(self.distance), self.color) )

class MRBFSIteration(MRJob):

    #Normally when you run a MRJob the output is in JSON format
    INPUT_PROTOCOL = RawValueProtocol
    OUTPUT_PROTOCOL = RawValueProtocol

    def configure_options(self):
        super(MRBFSIteration, self).configure_options()
        self.add_passthrough_option(
            '--target', help="ID of character we are searching for")

    def mapper(self, _, line):
        node = Node()
        node.fromLine(line)
        #If this node needs to be expanded...
        if (node.color == 'GRAY'):
            for connection in node.connections:
                vnode = Node()
                vnode.characterID = connection
                vnode.distance = int(node.distance) + 1
                vnode.color = 'GRAY'
                if (self.options.target == connection):
                    counterName = ("Target ID " + connection +
                        " was hit with distance " + str(vnode.distance))
                    self.increment_counter('Degrees of Separation',
                        counterName, 1)
                yield connection, vnode.getLine()

            #We've processed this node, so color it black
            node.color = 'BLACK'

        #Emit the input node so we don't lose it.
        yield node.characterID, node.getLine()

    def reducer(self, key, values):
        edges = []
        distance = 9999
        color = 'WHITE'

        for value in values:
            node = Node()
            node.fromLine(value)

            if (len(node.connections) > 0):
                edges.extend(node.connections)

            if (node.distance < distance):
                distance = node.distance

            if ( node.color == 'BLACK' ):
                color = 'BLACK'

            if ( node.color == 'GRAY' and color == 'WHITE' ):
                color = 'GRAY'

        node = Node()
        node.characterID = key
        node.distance = distance
        node.color = color
        node.connections = edges

        yield key, node.getLine()


if __name__ == '__main__':
    MRBFSIteration.run()

Writing BFSIteration.py


In [2]:
!python BFSIteration.py --target=100 BFS-iteration-0.txt > BFS-iteration-1.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/BFSIteration.prk.20160530.075100.132042
Running step 1 of 1...
Streaming final output from /tmp/BFSIteration.prk.20160530.075100.132042/output...
Removing temp directory /tmp/BFSIteration.prk.20160530.075100.132042...


In [3]:
!python BFSIteration.py --target=100 BFS-iteration-1.txt > BFS-iteration-2.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/BFSIteration.prk.20160530.075141.762876
Running step 1 of 1...
Counters: 1
	Degrees of Separation
		Target ID 100 was hit with distance 2=2
Counters: 1
	Degrees of Separation
		Target ID 100 was hit with distance 2=2
Streaming final output from /tmp/BFSIteration.prk.20160530.075141.762876/output...
Removing temp directory /tmp/BFSIteration.prk.20160530.075141.762876...


In [10]:
%%file MovieSimilarities.py

from mrjob.job import MRJob
from mrjob.step import MRStep
from math import sqrt

from itertools import combinations

class MovieSimilarities(MRJob):

    def configure_options(self):
        super(MovieSimilarities, self).configure_options()
        self.add_file_option('--items', help='Path to u.item')

    def load_movie_names(self):
        # Load database of movie names.
        self.movieNames = {}

        with open("u.item") as f:
            for line in f:
                fields = line.split('|')
                self.movieNames[int(fields[0])] = fields[1].decode('utf-8', 'ignore')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_parse_input,
                    reducer=self.reducer_ratings_by_user),
            MRStep(mapper=self.mapper_create_item_pairs,
                    reducer=self.reducer_compute_similarity),
            MRStep(mapper=self.mapper_sort_similarities,
                    mapper_init=self.load_movie_names,
                    reducer=self.reducer_output_similarities)]

    def mapper_parse_input(self, key, line):
        # Outputs userID => (movieID, rating)
        (userID, movieID, rating, timestamp) = line.split('\t')
        yield  userID, (movieID, float(rating))

    def reducer_ratings_by_user(self, user_id, itemRatings):
        #Group (item, rating) pairs by userID

        ratings = []
        for movieID, rating in itemRatings:
            ratings.append((movieID, rating))

        yield user_id, ratings

    def mapper_create_item_pairs(self, user_id, itemRatings):
        # Find every pair of movies each user has seen, and emit
        # each pair with its associated ratings

        # "combinations" finds every possible pair from the list of movies
        # this user viewed.
        for itemRating1, itemRating2 in combinations(itemRatings, 2):
            movieID1 = itemRating1[0]
            rating1 = itemRating1[1]
            movieID2 = itemRating2[0]
            rating2 = itemRating2[1]

            # Produce both orders so sims are bi-directional
            yield (movieID1, movieID2), (rating1, rating2)
            yield (movieID2, movieID1), (rating2, rating1)


    def cosine_similarity(self, ratingPairs):
        # Computes the cosine similarity metric between two
        # rating vectors.
        numPairs = 0
        sum_xx = sum_yy = sum_xy = 0
        for ratingX, ratingY in ratingPairs:
            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1

        numerator = sum_xy
        denominator = sqrt(sum_xx) * sqrt(sum_yy)

        score = 0
        if (denominator):
            score = (numerator / (float(denominator)))

        return (score, numPairs)

    def reducer_compute_similarity(self, moviePair, ratingPairs):
        # Compute the similarity score between the ratings vectors
        # for each movie pair viewed by multiple people

        # Output movie pair => score, number of co-ratings

        score, numPairs = self.cosine_similarity(ratingPairs)

        # Enforce a minimum score and minimum number of co-ratings
        # to ensure quality
        if (numPairs > 10 and score > 0.95):
            yield moviePair, (score, numPairs)

    def mapper_sort_similarities(self, moviePair, scores):
        # Shuffle things around so the key is (movie1, score)
        # so we have meaningfully sorted results.
        score, n = scores
        movie1, movie2 = moviePair

        yield (self.movieNames[int(movie1)], score), \
            (self.movieNames[int(movie2)], n)

    def reducer_output_similarities(self, movieScore, similarN):
        # Output the results.
        # Movie => Similar Movie, score, number of co-ratings
        movie1, score = movieScore
        for movie2, n in similarN:
            yield movie1, (movie2, score, n)


if __name__ == '__main__':
    MovieSimilarities.run()

Overwriting MovieSimilarities.py


In [11]:
!python MovieSimilarities.py --items=ml-100k/u.item ml-100k/u.data > sims.txt

No configs found; falling back on auto-configuration
Creating temp directory /tmp/MovieSimilarities.prk.20160530.094002.129516
Running step 1 of 3...
Running step 2 of 3...
Running step 3 of 3...
Streaming final output from /tmp/MovieSimilarities.prk.20160530.094002.129516/output...
Removing temp directory /tmp/MovieSimilarities.prk.20160530.094002.129516...
