In [1]:
# We will need these so we can reload modules as we modify them
%load_ext autoreload
%autoreload 2

In [37]:
# Utilize last week's HW to count the number of nodes in the graph
from MRJob_Explore import explore

def countNodes(filename):

    mr_job = explore(args=[filename, '--no-strict-protocols', '--exploreType', 'nodes'])

    output = []
    
    with mr_job.make_runner() as runner:
        runner.run()
        
        for line in runner.stream_output():
            out = mr_job.parse_output_line(line)
            print 'Number of nodes =', '{:,d}'.format(out[1])
    
    return out[1]

In [63]:
%%writefile PageRank_Iterate.py
from __future__ import division
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONProtocol

class iterate(MRJob):
    
    
    """
    Configurations
    """
    
    def configure_options(self):
        super(iterate, self).configure_options()
        self.add_passthrough_option('--numNodes', default=1)
    
    """
    Mapper: Distribute PageRank mass to all neighbors
    - Do not account for teleportation yet
    """
    
    INPUT_PROTOCOL = JSONProtocol
    
    #------------------
    # Mapper:
    # - Find the number of neighbors for the node
    # - Distribute current PageRank among all neighbors
    # - If there are no neighbors, keep track of dangling mass
    
    def mapper_dist(self, key, value):
        
        # Divide the current PageRank by the number of neighbors
        
        numNeighbors = len(value[0])
        PageRank = value[1]
        
        # If there are neighbors, distribute the PageRank to each neighbors
        
        if numNeighbors > 0:
            for neighbor in value[0]:
                yield neighbor, PageRank / numNeighbors
                
        # If there are no neighbors, we need to account for this dangling node
        
        else:
            with open('danglingMass.txt', 'w') as f:
                f.write()'*dangling', PageRank
        
        # Maintain the graph structure
        
        yield key, value[0]
     
    #------------------
    # Reducer:
    # - For each node, accumulate PageRank distributed from other nodes
    # - Maintain graph structure
    
    def reducer_dist(self, key, values):
        
        new_PageRank = 0.0
        neighbors = {}
        
        for val in values:
            if type(val) == type(0.0):
                new_PageRank += val
            elif type(val) == type({}):
                neighbors = val
        
        yield key, (neighbors, new_PageRank)

        
    
            
    """
    Multi-step pipeline
    """
    def steps(self):
        return [
            MRStep(mapper=self.mapper_dist,
                  reducer=self.reducer_dist)
            ]

if __name__ == '__main__':
    iterate.run()

Overwriting PageRank_Iterate.py


In [64]:
from PageRank_Iterate import iterate

def iteratePR(filename, n, runner, outputDir):
    
    if runner == 'local':
        mr_job = iterate(args=[filename, '--no-strict-protocols', '--numNodes=' + str(n), 
                               '--file', 'danglingMass.txt'])
        
    elif runner == 'hadoop':
        !hdfs dfs -rm -r {outputDir}
        mr_job = iterate(args=[filename, '--no-strict-protocols', '-r', 'hadoop', '--hadoop-home', '/usr/',
                               '--output-dir', outputDir, '--numNodes=' + str(n), '--file', 'danglingMass.txt'])
        
    elif runner == 'emr':
        !aws s3 rm --quiet {outputDir}
        mr_job = iterate(args=[filename, '--no-strict-protocols', '--no-output', '--numNodes=' + str(n),
                               '-r', 'emr', '--emr-job-flow-id', clusterId, '--output-dir', outputDir,
                               '--file', 'danglingMass.txt'])
        

    with mr_job.make_runner() as runner:
        runner.run()

        if runner != 'emr':
            for line in runner.stream_output():
                print mr_job.parse_output_line(line)

In [66]:
n = countNodes('PageRank-test.txt')

iteratePR('PageRank-testInitialized.txt', n, 'hadoop', '/user/miki/week09/iterate')

Number of nodes = 11
rm: `/user/miki/week09/iterate': No such file or directory
('*dangling', [{}, 0.09090909090909091])
('A', [{}, 0.045454545454545456])
('B', [{'C': 1}, 0.3484848484848485])
('C', [{'B': 1}, 0.09090909090909091])
('D', [{'A': 1, 'B': 1}, 0.030303030303030304])
('E', [{'B': 1, 'D': 1, 'F': 1}, 0.36363636363636365])
('F', [{'B': 1, 'E': 1}, 0.030303030303030304])
('G', [{'B': 1, 'E': 1}, 0.0])
('H', [{'B': 1, 'E': 1}, 0.0])
('I', [{'B': 1, 'E': 1}, 0.0])
('J', [{'E': 1}, 0.0])
('K', [{'E': 1}, 0.0])


In [12]:
%%writefile PageRank_TopN.py
from __future__ import division
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONProtocol

class topN(MRJob):
    
    #------------------
    # Configurations:
    
    def configure_options(self):
        super(topN, self).configure_options()
        self.add_passthrough_option('--top', default=10, type='int')

    
    INPUT_PROTOCOL = JSONProtocol
    
    #------------------
    # Mapper:
    # - Throw out graph structure
    # - Use PageRank as key
    
    def mapper(self, key, value):
        yield value[1], key
        
    #------------------
    # Reducer:
    # - Take top N values
    
    def reducer_init(self):
        self.seen = 0
    
    def reducer(self, key, values):
        n = self.options.top
        
        for val in values:
            if self.seen < n:
                yield key, val
                self.seen += 1
                
    #------------------
    # Pipeline:
    
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer_init=self.reducer_init,
                       reducer=self.reducer,
                       jobconf={'mapred.output.key.comparator.class':'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
                                'mapred.text.key.partitioner.options':'-k1,1',
                                'stream.num.map.output.key.fields':1,
                                'mapred.text.key.comparator.options':'-k1,1nr',
                                'mapred.reduce.tasks': 1
                               })]

        
if __name__ == '__main__':
    topN.run()

Overwriting PageRank_TopN.py


In [13]:
from PageRank_TopN import topN

def topNPR(filename, n, runnerType, outputDir, printOutput):

    output = []
    
    if runnerType == 'local':
        mr_job = topN(args=[filename, '--no-strict-protocols', '--top=' + str(n)])

    elif runnerType == 'hadoop':
        !hdfs dfs -rm -r {outputDir}
        mr_job = topN(args=[filename, '--no-strict-protocols', '-r', 'hadoop', '--hadoop-home', '/usr/',
                            '--output-dir', outputDir, '--top=' + str(n)])

    elif runnerType == 'emr':
        !aws s3 rm --quiet {outputDir}
        mr_job = topN(args=[filename, '--no-strict-protocols', '--no-output', '--top=' + str(n),
                            '-r', 'emr', '--emr-job-flow-id', clusterId, '--output-dir', outputDir])


    with mr_job.make_runner() as runner:
        runner.run()

        if runnerType != 'emr':
            for line in runner.stream_output():
                out = mr_job.parse_output_line(line)
                output.append(out)
                if printOutput:
                    print out

        if runnerType == 'local':
            with open(outputDir, 'w') as f:
                for line in output:
                    f.writelines('"' + str(line[0]) + '"\t' + str(line[1]) + '\n' )

In [15]:
inputFile = 'PageRank-result.txt'

# Do this so JSON protocol works
!rm {inputFile}
!hdfs dfs -copyToLocal /user/miki/week09/result/part-00000 {inputFile}

outputDir = '/user/miki/week09/top'

n = 5

topNPR(inputFile, n, 'hadoop', outputDir, True)

Deleted /user/miki/week09/top


The have been translated as follows
 mapred.text.key.comparator.options: mapreduce.partition.keycomparator.options
mapred.text.key.partitioner.options: mapreduce.partition.keypartitioner.options
mapred.reduce.tasks: mapreduce.job.reduces
mapred.output.key.comparator.class: mapreduce.job.output.key.comparator.class


(0.2875607312797395, 'C')
(0.2606908965689958, 'B')
(0.11164819684447784, 'F')
(0.11164819684447784, 'D')
(0.09941334835962937, 'E')
