# PageRank algorithm using MapReduce

In [26]:
import xml.etree.ElementTree as ET
import os

def parse_padgett_data(xml_file_path: str, output_txt_path: str):
    """
    Parses the Padgett Florentine families XML file to extract nodes
    and marriage links (PADGM network with value=1.0), writing them
    to a text file suitable for MRJob input.

    Format:
        NODE_ID\t!NODE  (for declaring all nodes)
        SOURCE_ID\tTARGET_ID (for representing marriage links)

    Args:
        xml_file_path (str): Path to the input padgett.xml file.
        output_txt_path (str): Path where the output .txt file will be saved.
    """
    nodes = set()
    edges = []

    try:
        if not os.path.exists(xml_file_path):
             print(f"Error: Input file not found at {xml_file_path}")
             return False

        # Parse the XML tree
        tree = ET.parse(xml_file_path)
        root = tree.getroot()

        # --- Extract all node IDs ---
        # Find the 'agent' nodeclass and extract all node IDs within it
        agent_nodes = root.findall("./MetaNetwork/nodes/nodeclass[@type='agent']/node")
        for node in agent_nodes:
            node_id = node.get('id')
            if node_id:
                nodes.add(node_id)

        # --- Extract relevant links (marriage network PADGM, value=1.0000) ---
        marriage_links = root.findall("./MetaNetwork/networks/network[@id='PADGM']/link")
        for link in marriage_links:
            source = link.get('source')
            target = link.get('target')
            value = link.get('value')

            if value == "1.0000" and source and target:
                if source in nodes and target in nodes:
                    edges.append((source, target))

        with open(output_txt_path, 'w') as f_out:
            for node_id in sorted(list(nodes)):
                f_out.write(f"{node_id}\t!NODE\n")

            for source, target in edges:
                f_out.write(f"{source}\t{target}\n")

        print(f"Successfully parsed '{xml_file_path}' and created '{output_txt_path}'.")
        print(f"Found {len(nodes)} nodes and {len(edges)} marriage links.")
        return True

    except ET.ParseError as e:
        print(f"Error parsing XML file '{xml_file_path}': {e}")
        return False
    except IOError as e:
        print(f"Error writing to output file '{output_txt_path}': {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

input_xml = './data/padgett.xml'
output_txt = './data/padgett_input.txt' # Name for the generated file

if parse_padgett_data(input_xml, output_txt):
    print(f"file saved: {output_txt}")

Successfully parsed './data/padgett.xml' and created './data/padgett_input.txt'.
Found 16 nodes and 40 marriage links.
file saved: ./data/padgett_input.txt


In [27]:
%%writefile page_rank.py
# Save this cell's content to page_rank.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import json

class MRPageRank(MRJob):
    """
    MRJob Step 1: Initialize Graph and Assign Initial Ranks.
    Reads the parsed data (nodes and edges).
    Outputs each node with its initial PageRank and adjacency list.
    """
    N_NODES = 16
    DAMPING_FACTOR = 0.85

    # INITIALIZATION STEPS 
    def mapper_init_graph(self, _, line):
        """
        Mapper for initialization step.
        Input: One line from parsed_padgett_data.txt
               (e.g., "MEDICI\t!NODE" or "ALBIZZI\tGINORI")
        Output: Yields (node, neighbor_or_marker) pairs.
                Key: The source node (family name).
                Value: Either the target node (neighbor family name)
                       or the '!NODE' marker.
        """
        line = line.strip()

        # Split the line into parts based on the tab character
        parts = line.split('\t', 1) # Split only on the first tab

        if len(parts) == 2:
            node_key = parts[0]
            value_part = parts[1]

            # Yield the node as the key and the second part as the value
            # The reducer will group these values by node_key
            yield node_key, value_part


    def reducer_init_graph(self, node, values):
        """
        Reducer for initialization step.
        Input: node (family name), iterator of values ('!NODE' or neighbor names)
        Output: Yields (node, json_string_of_tuple) where tuple is (initial_rank, [neighbor_list])
                Using JSON string for output value ensures correct handling by subsequent mrjob steps.
        """
        neighbors = []
        node_declared = False # Flag to confirm '!NODE' was seen or links exist

        # Iterate through all values received for this node
        for value in values:
            node_declared = True
            if value != '!NODE':
                neighbors.append(value)

        # Only output if the node was actually declared in the input
        if node_declared:
            # Initialize PageRank
            initial_rank = 1.0 / self.N_NODES

            # Sort neighbors for easier debugging
            neighbors.sort()

            output_value = (initial_rank, neighbors)

            # Yield the node and the JSON encoded tuple (rank, neighbors)
            yield node, json.dumps(output_value)

    # RANK CALCULATION STEPS
    def mapper_pagerank_iter(self, node, value_str):
        """
        Mapper for a single PageRank iteration.
        Input: key=node (family name), value=JSON string "(current_rank, [neighbors])"
            (e.g., node="MEDICI", value_str="[0.0625, [\"ACCIAIUOL\", ...]]")
        Output:
            1. Yields (node, ('NODE', [neighbors])) to pass graph structure along.
            2. Yields (neighbor, ('RANK', contribution)) for each neighbor.
        """
        # Load the tuple (current_rank, neighbors) from the JSON string
        try:
            current_rank, neighbors = json.loads(value_str)
        except (json.JSONDecodeError, ValueError) as e:
            # Handle potential errors if the input isn't valid JSON or doesn't unpack correctly
            self.increment_counter('Error', 'JSONDecodeError', 1)
            return

        # 1. Emit the graph structure information for this node.
        #    The value is a tuple starting with 'NODE' marker.
        yield node, ('NODE', neighbors)

        # 2. Distribute rank contribution to neighbors.
        if neighbors: # Only distribute rank if there are outgoing links
            num_neighbors = len(neighbors)
            contribution = current_rank / num_neighbors

            # For each neighbor, emit the rank contribution.
            # The value is a tuple starting with 'RANK' marker.
            for neighbor in neighbors:
                yield neighbor, ('RANK', contribution)
        # else:
            # This is a dangling node (no outgoing links).
            # In basic PageRank, its rank "disappears" in this step,
            # but the damping factor in the reducer compensates for this lost rank globally.
            # More advanced implementations might distribute this rank differently.
            pass

    def reducer_pagerank_iter(self, node, values):
        """
        Reducer for a single PageRank iteration.
        Input: key=node (family name),
            value=iterator of tuples like ('NODE', [neighbors]) or ('RANK', contribution)
        Output: Yields (node, json_string_of_tuple) where tuple is (new_rank, [neighbor_list])
                (same format as the input to the iteration mapper)
        """
        total_received_rank = 0.0
        neighbors = []
        node_info_found = False # Flag to ensure we found the ('NODE', neighbors) tuple

        # Constants for the PageRank calculation
        N = self.N_NODES     # Get N from class variable
        damping_factor = self.DAMPING_FACTOR

        # Iterate through all values received for this node
        for value_type, data in values:
            if value_type == 'NODE':
                neighbors = data # Get the list of neighbors
                node_info_found = True
            elif value_type == 'RANK':
                total_received_rank += data # Add the received rank contribution
            # else:
                # Optional: handle unexpected value types
                # self.increment_counter('Error', 'UnknownValueTypeInReducer', 1)

        # We must have the node structure info to proceed
        if node_info_found:
            # Calculate new PageRank
            new_rank = (1 - damping_factor) / N + (damping_factor * total_received_rank)

            output_value = (new_rank, neighbors) # Keep the neighbors list

            yield node, json.dumps(output_value)
        # else:
            # This case shouldn't happen if the mapper always sends ('NODE', ...)
            # but adding a counter helps debug if it does.
            # self.increment_counter('Error', 'NodeInfoMissingInReducer', 1)

    def steps(self):
            """Define the sequence of MapReduce steps."""
            # Create a list of MRStep objects for the iterations
            iteration_steps = [
                MRStep(mapper=self.mapper_pagerank_iter,
                    reducer=self.reducer_pagerank_iter)
                for _ in range(50) # Create 10 identical iteration steps
            ]

            # Return the list starting with the initialization step,
            # followed by all the iteration steps.
            return [
                MRStep(mapper=self.mapper_init_graph,
                    reducer=self.reducer_init_graph)
            ] + iteration_steps # Concatenate the lists

# This makes the script runnable from the command line
if __name__ == '__main__':
    MRPageRank.run()

Overwriting page_rank.py


In [28]:
!python page_rank.py ./data/padgett_input.txt > ./output/step1_output_50.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/49/4w4d2pt94bjc7rwd_ckjbhzh0000gn/T/page_rank.tilen.20250504.000550.350583
Running step 1 of 51...
Running step 2 of 51...
Running step 3 of 51...
Running step 4 of 51...
Running step 5 of 51...
Running step 6 of 51...
Running step 7 of 51...
Running step 8 of 51...
Running step 9 of 51...
Running step 10 of 51...
Running step 11 of 51...
Running step 12 of 51...
Running step 13 of 51...
Running step 14 of 51...
Running step 15 of 51...
Running step 16 of 51...
Running step 17 of 51...
Running step 18 of 51...
Running step 19 of 51...
Running step 20 of 51...
Running step 21 of 51...
Running step 22 of 51...
Running step 23 of 51...
Running step 24 of 51...
Running step 25 of 51...
Running step 26 of 51...
Running step 27 of 51...
Running step 28 of 51...
Running step 29 of 51...
Running step 30 of 51...
Running step 31 of 51...
Running step 32 of 51...
Runni