# Homework 4 Page Rank with Map Reduce

## Problem description
Use page rank with map reduce to estimate strength of the florentine families. Find another graph dataset with more edges and nodes and test on that one as well

In [2]:
import xml.etree.ElementTree as ET
import mrjob

In [24]:
def parse_dynetml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    node_count = 0
    nodes = {}
    # Extract nodes and convert to ids so it is easier to process
    # An inverse mapping would be done normally but since they 
    # are few nodes for testing I just printed out the mapping
    # and compared the number to the name
    for node_class in root.findall('.//nodeclass'):
        for idx, node in enumerate(node_class.findall('.//node')):
            nodes[node.attrib['id']] = idx    
            node_count += 1

    # Extract networks
    networks = {}
    for network in root.findall('.//network'):
        links = [[] for _ in range(node_count)]
        network_id = network.attrib['id']
        linksXml = network.findall('.//link')
        for link in linksXml :
            # Check if the link exists
            # If value is zero I assumed there is no relationship
            # or we would have a fully connected graph
            if float(link.attrib['value']) > 0:
                source = nodes[link.attrib['source']]
                target = nodes[link.attrib['target']]
                links[source].append(target)
        
        node_count = len(links)
        for i in range(len(links)):
            links_temp = links[i]
            links[i] = [1.0 / node_count, len(links[i]), links_temp]

        networks[network_id] = links

    return nodes, networks

nodes, networks = parse_dynetml('padgett.xml')

print("Nodes:")
for node_id, node_name in enumerate(nodes):
    print(f"Node ID: {node_id} - Node Name: {node_name}")


# In the end I use only the PADGB network which is for business relationships
# between the florentine families. This is an arbitrary choice because I only
# needed one of the two networks for testing
with open('PADGB.txt', 'w') as f:
    for network_id, network_links in networks.items():
        if network_id == 'PADGM':
            continue
        for idx, el in enumerate(network_links):
            f.write(f"{idx}: {el}\n")

Nodes:
Node ID: 0 - Node Name: ACCIAIUOL
Node ID: 1 - Node Name: ALBIZZI
Node ID: 2 - Node Name: BARBADORI
Node ID: 3 - Node Name: BISCHERI
Node ID: 4 - Node Name: CASTELLAN
Node ID: 5 - Node Name: GINORI
Node ID: 6 - Node Name: GUADAGNI
Node ID: 7 - Node Name: LAMBERTES
Node ID: 8 - Node Name: MEDICI
Node ID: 9 - Node Name: PAZZI
Node ID: 10 - Node Name: PERUZZI
Node ID: 11 - Node Name: PUCCI
Node ID: 12 - Node Name: RIDOLFI
Node ID: 13 - Node Name: SALVIATI
Node ID: 14 - Node Name: STROZZI
Node ID: 15 - Node Name: TORNABUON


The section below saves the code in it to a sepearate file. This is the actual map reduce job which we define using the mrjob library. The job can not be executed from the notebook and the runner I use later needs it to be in a separate file. Also the jobs read from files line by line. They can not receive input arguments. This is probably due to the fact them to be executed at separate cluster nodes and we expect big data divided between them that can not be loaded into memory.

In [22]:
%%file main.py
from mrjob.job import MRJob
import xml.etree.ElementTree as ET
import ast
from mrjob.step import MRStep


# We use global variables which is a bit hacky
# But the goal of the exercise was to get used to map reduce
# and get a feel for it. Because of this it was allowed
dangling_sum = 0
s = 0.85
node_count = 0

class MRPageRank(MRJob):

    # Define two steps of the job
    # First for each dangling node it sends its probability to a reducer
    # Which we use to calculate the sum of probabilities for all dangling nodes
    # In the second step we recalculate the page ranks
    def steps(self):
        return [
            MRStep(mapper=self.ip_mapper,
                     reducer=self.ip_reducer),
            MRStep(mapper=self.mapper,
                   reducer=self.reducer)
        ]

    # If a node is dangling it sends its probability to node 1
    # All nodes also send their data forward so it can be used 
    # in the second part of the job
    def ip_mapper(self, _, line):
        idx, line = line.split(':', 1)
        key = int(idx)
        global node_count, dangling_sum
        dangling_sum = 0

        # Here we caluclate the number of nodes
        # and save it in the global variable node_count
        # These values can be also passed through the job as
        # well but this was simpler to implement
        if key + 1 > node_count:
            node_count = key + 1

        parsed_line = ast.literal_eval(line)
        p_self = parsed_line[0]
        neighbor_count = parsed_line[1]
        neighbors = parsed_line[2]

        if neighbor_count == 0: yield 1, [p_self]

        yield key, (p_self, neighbor_count, neighbors) 
    
    # For each dangling node we add to the sum
    # If a node is not dangling it would have sent
    # all the data about itself so we just propagate it
    # forward.
    def ip_reducer(self, key, values):
        global dangling_sum
        for value in values:
            if len(value) == 1:
                dangling_sum += float(value[0])
                continue
            
            yield key, value


    # For each neighbor the method sends the conditional probability 
    # to go to it given we are for sure going to
    # a neighboring node. We assume each neighbor has an equal chance
    # to b chosen. The data is again propagated
    # to ourselves so it can be used to caclulate the new page rank
    def mapper(self, key, vals):
        p_self, neighbor_count, neighbors = vals
        for neigh in neighbors:
            yield neigh, p_self/neighbor_count

        yield key, (0.0, p_self, neighbor_count, neighbors)

    # For each node we calculate the new page rank
    # by first taking the sum of probabilities of neighbors
    # coming to the node and then we use the page rank formula
    # In the end we output the new page rank and the data about the node
    # so it is easier to update the files with the data for the next iteartion
    # and so I can calculate the change to see if we converged
    def reducer(self, key, values):
        global dangling_sum, s, node_count
        probSum = 0
        for val in values:
            if not isinstance(val, float):
                p_old = val[1]
                neighbor_count = val[2]
                neighbors = val[3]
            else:
                probSum += val
        p_new = s * probSum + (s * dangling_sum  + 1.0 - s) / node_count
        yield key, [p_new, neighbor_count, neighbors, p_old]


if __name__ == '__main__':
    MRPageRank.run()

Overwriting main.py


In the below section I use mr_job runner to execute the python MRJob. Normally it is sent to a hadoop cluster so because of this the runner and the actual job script can not be in the same file. After I run the job in the same file as the input I update the values so that they reflect the new page ranks and then call the job again until we converge. To converge we want the sum of changes to the probabilites to become less than the tolerance.

In [25]:
import logging
from main import MRPageRank

# Stop warnings for no configuration file
logging.getLogger('mrjob').setLevel(logging.ERROR)


def pagerank(fileName, tolerance = 0.00001):

    mr_job = MRPageRank(args=[fileName])

    change = 5
    iteration = 0
    while change > tolerance:
        iteration += 1
        check = 0
        print(f'iteration: {iteration}, change: {format(change, ".6f")}')
        with mr_job.make_runner() as runner:
            change = 0
            runner.run()
            # Update probabilities based on results
            with open(fileName, 'w') as f:
                for el in mr_job.parse_output(runner.cat_output()):
                    old_rank = el[1][3]
                    new_rank = el[1][0]
                    check += new_rank
                    change += abs(new_rank - old_rank)
                    f.write(f'{el[0]}: {el[1][:3]}\n')

        if check != 1:
            raise ValueError(f"Error: {check}")


pagerank('PADGB.txt')

iteration: 1, change: 5.000000


iteration: 2, change: 0.725933
iteration: 3, change: 0.284351
iteration: 4, change: 0.213954
iteration: 5, change: 0.151908
iteration: 6, change: 0.120915
iteration: 7, change: 0.086072
iteration: 8, change: 0.069637
iteration: 9, change: 0.050338
iteration: 10, change: 0.041161
iteration: 11, change: 0.030546
iteration: 12, change: 0.025035
iteration: 13, change: 0.019671
iteration: 14, change: 0.015674
iteration: 15, change: 0.012981
iteration: 16, change: 0.010598
iteration: 17, change: 0.009008
iteration: 18, change: 0.007657
iteration: 19, change: 0.006508
iteration: 20, change: 0.005532
iteration: 21, change: 0.004702
iteration: 22, change: 0.003997
iteration: 23, change: 0.003397
iteration: 24, change: 0.002888
iteration: 25, change: 0.002455
iteration: 26, change: 0.002086
iteration: 27, change: 0.001774
iteration: 28, change: 0.001507
iteration: 29, change: 0.001281
iteration: 30, change: 0.001089
iteration: 31, change: 0.000926
iteration: 32, change: 0.000787
iteration: 33, c

Looking at the file with the results we can see the Medici family has the highest page rank which means they are the strongest which makes sense. On the other hand families with no bussiness relations/dangling nodes have the lowest ranks which is what would be expected.

## "Large" dataset

I found a web subgraph dataset for all the websites that point to epa.gov on this link: https://www.cs.cornell.edu/courses/cs685/2002fa/ in the Network datasets section. It has around 9000 edges and 5000 nodes. The dataset is self explanatory. Node rows start with n and the nodes have ids and a name. Edge rows start with e and are just source id and destination id. Below I parse it and run page rank.

In [17]:
import collections

def parse_graph(file_name):
    graph = collections.defaultdict(list)
    nodes = set()
    with open(file_name, 'r') as file:
        for line in file:
            # Take node lines and save all the node ids
            if line.startswith('n'):
                _, node_id, _ = line.split()
                nodes.add(int(node_id))
            # Take edge lines and add edges to graph
            elif line.startswith('e'):
                _, source, destination = line.split()
                graph[int(source)].append(int(destination))
    return graph, nodes

def write_output(graph, nodes, output_file_name):
    unique_nodes = len(nodes)
    with open(output_file_name, 'w') as file:
        for node in nodes:
            neighbors = graph.get(node, [])
            file.write(f'{node}: [{1.0/unique_nodes}, {len(neighbors)}, {neighbors}]\n')

graph, nodes = parse_graph('./web_subgraph.txt')
write_output(graph, nodes, 'web-parsed.txt')

In [18]:
pagerank('web-parsed.txt')

iteration: 1, change: 5.000000
iteration: 2, change: 0.961941
iteration: 3, change: 0.213348
iteration: 4, change: 0.062813
iteration: 5, change: 0.023719
iteration: 6, change: 0.010736
iteration: 7, change: 0.006698
iteration: 8, change: 0.004646
iteration: 9, change: 0.003602
iteration: 10, change: 0.002838
iteration: 11, change: 0.002317
iteration: 12, change: 0.001903
iteration: 13, change: 0.001586
iteration: 14, change: 0.001326
iteration: 15, change: 0.001116
iteration: 16, change: 0.000941
iteration: 17, change: 0.000796
iteration: 18, change: 0.000673
iteration: 19, change: 0.000571
iteration: 20, change: 0.000484
iteration: 21, change: 0.000411
iteration: 22, change: 0.000349
iteration: 23, change: 0.000296
iteration: 24, change: 0.000251
iteration: 25, change: 0.000213
iteration: 26, change: 0.000181
iteration: 27, change: 0.000154
iteration: 28, change: 0.000131
iteration: 29, change: 0.000111
iteration: 30, change: 0.000094
iteration: 31, change: 0.000080
iteration: 32, ch

We converge again in 43 iterations but it took around 1 minute and 20 seconds instead of 20 seconds. Since the dataset is bigger I just sorted seperately the results and got that https://www.usgs.gov/ is with the highest page rank. I do not know the site but this was my result.