In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.3.1-bin-hadoop3')
findspark.find()

'/home/ubuntu/spark-3.3.1-bin-hadoop3'

In [2]:
import re 
import sys
from operator import add
from typing import Iterable, Tuple
from pyspark.resultiterable import ResultIterable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
#from helper import parseNeighborURLs, wrap_neighbor, combine_neighbors

In [None]:

    
def parseNeighborURLs(urls: str) -> Tuple[str, str]:
    """
    Parses a string containing two URLs separated by whitespace and returns them as a tuple.

    This function splits the input string using regular expressions to handle cases with 
    multiple spaces or tabs. It returns a tuple where:
    - The first element is the source URL.
    - The second element is the destination URL (linked URL).

    Args:
        urls (str): A string containing two URLs separated by whitespace.

    Returns:
        Tuple[str, str]: A tuple containing (source_url, destination_url).
"""
    parts = re.split(r'\s+', urls.strip())
    return parts[0], parts[1]

# Helper function to calculate rank contributions from neighbors
def calculateRankContrib(urls, rank: float) -> Iterable[Tuple[str, float]]:
    """
    Distributes the rank of a URL equally among its outbound links.
    This function computes how much rank each outbound link should receive from the source URL 
    based on the current rank of the source URL and the number of outbound links.

    Args:
        urls (Iterable[str]): A list of outbound URLs from the source URL.
        rank (float): The current rank of the source URL.

    Yields:
        Iterable[Tuple[str, float]]: An iterable of (url, rank_contribution) pairs,
        where rank_contribution = rank / num_urls.
"""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def initialize_rank(url_neighbors: Tuple[str, Iterable[Tuple[str, str]]]) -> Tuple[str, float]:
    """
    Initializes the rank of a URL to 1.0.

    This function is used at the start of the PageRank computation. 
    Every URL is initialized with an equal starting rank.

    Args:
        url_neighbors (Tuple[str, Iterable[str]]): A tuple where:
            - The first element is the URL.
            - The second element is an iterable of neighboring URLs (linked URLs).

    Returns:
        Tuple[str, float]: A tuple containing the URL and its initial rank of 1.0.
"""
    url, _ = url_neighbors
    return (url, 1.0)

def computeContrib(url_urls_rank: Tuple[str, Tuple[Iterable[str], float]]) -> Iterable[Tuple[str, float]]:
    """
    Computes the rank contribution from a URL to its outbound links.

    This function takes a tuple where:
    - The first element is the URL.
    - The second element is a tuple containing:
        - A list of outbound links (neighboring URLs).
        - The current rank of the URL.
    It calculates how much rank each outbound link should receive based on the current rank.
    Args:
        url_urls_rank (Tuple[str, Tuple[Iterable[str], float]]): A tuple containing:
            - URL (str)
            - (list of neighboring URLs, current rank) 
    Yields:
        Iterable[Tuple[str, float]]: An iterable of (url, rank_contribution) pairs.
"""
    neighbors, rank = url_urls_rank[1]
    return list(calculateRankContrib(neighbors, rank))

def wrap_neighbor(neighbor):
    """
    Wraps a single neighbor URL in a list.

    This function is useful for processing single links as lists for consistency 
    with other processing functions.

    Args:
        neighbor (str): A single neighboring URL.

    Returns:
        list: A list containing the single neighboring URL.
    """
    return [neighbor]

def combine_neighbors(a, b):
    """
    Combines two lists of neighboring URLs into a single list.

    This function is used when merging multiple neighboring URLs 
    during the reduce phase of the PageRank computation.

    Args:
        a (list): First list of neighboring URLs.
        b (list): Second list of neighboring URLs.

    Returns:
        list: A combined list of neighboring URLs.
    """
    return a + b

In [None]:
spark = SparkSession.builder \
	.appName("DS5110: my awesome Spark program") \
	.master("spark://172.31.79.76:7077") \
	.config("spark.executor.memory", "1024M") \
    .getOrCreate()

# You can read the data from a file into DataFrames
df = spark.read.csv("hdfs://172.31.79.76:9000/export.csv")

# Cast the column `_c14` to an integer type and rename to `timestamp`
# -  ensures values are treated as numeric timestamps, not strings
# - `col()` is used to reference the column by its generated name
df = df.withColumn("timestamp", col("_c14").cast("int"))
# Cast the column `_c2` to a string type and rename to `cca2`
df = df.withColumn("cca2", col("_c2").cast(StringType()))

# Sort the DataFrame based on:
# - `cca2` in ascending order
# - `timestamp` in ascending order
sorted_df = df.orderBy(["cca2", "timestamp"])

sorted_df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/10 04:09:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+---+----+---+---+--------------------+---+--------------------+---+---------------+-----+------+------+-------+----+-------------+---------+----+
|_c0| _c1|_c2|_c3|                 _c4|_c5|                 _c6|_c7|            _c8|  _c9|  _c10|  _c11|   _c12|_c13|         _c14|timestamp|cca2|
+---+----+---+---+--------------------+---+--------------------+---+---------------+-----+------+------+-------+----+-------------+---------+----+
|  5|1217| AE|ARE|United Arab Emirates|501|device-mac-501e4O...| 48|  213.42.16.154|   24|yellow|    54|Celsius|  16|1458444054343|     null|  AE|
|  1|1189| AR|ARG|           Argentina|319|meter-gauge-319Y3...| 54| 200.71.236.145|-34.6|yellow|-58.38|Celsius|  25|1458444054287|     null|  AR|
|  0| 915| AR|ARG|           Argentina|227|meter-gauge-2273p...| 34|  200.71.230.81|-34.6| green|-58.38|Celsius|  15|1458444054251|     null|  AR|
|  8|1386| AR|ARG|           Argentina|763|meter-gauge-763JW...| 82|    200.55.0.70|-34.6|yellow|-58.38|Celsius|  21|1

In [6]:
sorted_df.write.format("csv").mode("overwrite").save("hdfs://172.31.79.76:9000/ubuntu/part2_output")

                                                                                

# Part 3

## App 1

In [None]:
# Initialize the Spark session
spark = SparkSession.builder \
        .appName("PageRank") \
        .master("spark://172.31.79.76:7077") \
        .config("spark.executor.memory", "2048M") \
        .getOrCreate()

# Read the input file into RDD
linesRDD = spark.sparkContext.textFile("hdfs://172.31.79.76:9000/web-BerkStan.txt")

# Parse each line into (URL, neighbor) pairs.
# - `distinct()` ensures no duplicate links are processed
# - `groupByKey()` groups all neighbors by source URL

linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)) \
                    .distinct() \
                    .groupByKey()

# Initialize each URL's rank to 1.0
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Iteratively calculate new ranks (for 10 iterations)
for iteration in range(10):
    contribsRDD = linksRDD.join(ranksRDD).flatMap(
        lambda url_urls_rank: calculateRankContrib(url_urls_rank[1][0], url_urls_rank[1][1])
    )

    # Compute new ranks
    ranksRDD = contribsRDD.reduceByKey(lambda x, y: x + y).mapValues(lambda rank: 0.15 + 0.85 * rank)
    
# Sort the ranks in descending order
sortedRanksRDD = ranksRDD.sortBy(lambda x: -x[1])

# Output the top 50 results
top_50 = sortedRanksRDD.take(50)
for rank in top_50:
    print(rank)
    
# Save the results to HDFS as CSV
output_file_path = "hdfs://172.31.79.76:9000/output_ranks_g"
sortedRanksRDD.saveAsTextFile(output_file_path)

ranksDF = spark.createDataFrame(sortedRanksRDD, ["URL", "Rank"])

25/03/10 04:09:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.




In [None]:
# Save the DataFrame to HDFS as CSV
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.79.76:9000/part2_output")

In [None]:
# Save output to HDFS
ranksDf = ranksRDD.toDF()
df.write.format("csv").mode("overwrite").save("hdfs://172.31.79.76:9000/output_ranks")
ranksDf.write.format("csv").save("hdfs://172.31.79.76:9000/output_ranks_file2")

## App 2

In [None]:
# Set custom number of partitions
numPartitions = 100

# Read the input file into RDD and repartition
linesRDD = spark.sparkContext.textFile("hdfs://172.31.79.76:9000/web-BerkStan.txt") \
                             .repartition(numPartitions)

# Group links by key using custom partitioning
linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)) \
                    .distinct() \
                    .groupByKey(numPartitions)

# Initialize Ranks RDD (Each URL starts with a rank of 1.0)
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Iteratively calculate new ranks (for 10 iterations)
for iteration in range(10):
    contribsRDD = linksRDD.join(ranksRDD).flatMap(
        lambda url_urls_rank: calculateRankContrib(url_urls_rank[1][0], url_urls_rank[1][1])
    )

    # Compute new ranks with explicit number of partitions
    ranksRDD = contribsRDD.reduceByKey(lambda x, y: x + y, numPartitions) \
                          .mapValues(lambda rank: 0.15 + 0.85 * rank)

# Sort the ranks in descending order using custom partitioning
sortedRanksRDD = ranksRDD.sortBy(lambda x: -x[1], ascending=False, numPartitions=numPartitions)

# Output the top 50 results
top_50 = sortedRanksRDD.take(50)
for rank in top_50:
    print(rank)

# Save the results to HDFS
output_file_path = "hdfs://172.31.79.76:9000/output_ranks_custom"
sortedRanksRDD.saveAsTextFile(output_file_path)

# Convert to DataFrame and save as CSV
ranksDF = spark.createDataFrame(sortedRanksRDD, ["URL", "Rank"])
ranksDF.write.csv("hdfs://172.31.79.76:9000/output_ranks_custom.csv", header=True)

## App 3

In [None]:
# Set custom number of partitions
numPartitions = 100

# Increase the number of allowed task failures to simulate recovery
spark = SparkSession.builder \
        .appName("PageRank") \
        .master("spark://172.31.79.76:7077") \
        .config("spark.executor.memory", "2048M") \
        .config("spark.task.maxFailures", "4") \
        .getOrCreate()

# Read the input file into RDD and repartition
linesRDD = spark.sparkContext.textFile("hdfs://172.31.79.76:9000/web-BerkStan.txt") \
                             .repartition(numPartitions)

# Group links by key using custom partitioning
linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)) \
                    .distinct() \
                    .groupByKey(numPartitions)

# Initialize Ranks RDD (Each URL starts with a rank of 1.0)
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Iteratively calculate new ranks (for 10 iterations)
for iteration in range(10):
    contribsRDD = linksRDD.join(ranksRDD).flatMap(
        lambda url_urls_rank: calculateRankContrib(url_urls_rank[1][0], url_urls_rank[1][1])
    )

    # Introduce a delay mid-way to simulate failure opportunity
    if iteration == 5:
        import time
        print("Pausing to allow worker failure...")
        time.sleep(10)  # Give time to kill a worker manually

    # Compute new ranks with explicit number of partitions
    ranksRDD = contribsRDD.reduceByKey(lambda x, y: x + y, numPartitions) \
                          .mapValues(lambda rank: 0.15 + 0.85 * rank)

# Sort the ranks in descending order using custom partitioning
sortedRanksRDD = ranksRDD.sortBy(lambda x: -x[1], ascending=False, numPartitions=numPartitions)

# Output the top 50 results
top_50 = sortedRanksRDD.take(50)
for rank in top_50:
    print(rank)

# Save the results to HDFS
output_file_path = "hdfs://172.31.79.76:9000/output_ranks_failure"
sortedRanksRDD.saveAsTextFile(output_file_path)

# Convert to DataFrame and save as CSV
ranksDF = spark.createDataFrame(sortedRanksRDD, ["URL", "Rank"])
ranksDF.write.csv("hdfs://172.31.79.76:9000/output_ranks_failure.csv", header=True)