In [1]:
# !pip install pyspark

In [2]:
# !pip install findspark

# Finding and Starting Spark Session

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.3.1-bin-hadoop3')
findspark.find()

'/home/ubuntu/spark-3.3.1-bin-hadoop3'

In [2]:
from pyspark.sql import SparkSession

# The entry point into all functionality in Spark is the SparkSession class.
spark = (SparkSession
	.builder
	.appName("DS5110")
	.master("spark://172.31.47.226:7077")
	.config("spark.executor.memory", "1024M")
	.getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/27 20:27:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Task 2

In [3]:
df = spark.read.csv("hdfs://172.31.47.226:9000/export.csv", header=True) # read in CSV and specify header column

                                                                                

In [4]:
df.show(n=1) 

+-------------+---------+----+----+-------------+---------+--------------------+--------+------------+--------+-----+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|          ip|latitude|  lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+------------+--------+-----+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|68.161.225.1|      38|green|      -97|Celsius|  34|1458444054093|
+-------------+---------+----+----+-------------+---------+--------------------+--------+------------+--------+-----+---------+-------+----+-------------+
only showing top 1 row



In [5]:
sorted_df = df.orderBy(['cca2','timestamp']) # sort by cca2 and then timestamp columns

In [6]:
sorted_df.write.format("csv").mode("overwrite").save("hdfs://172.31.47.226:9000/sorted_export.csv") # Write the new CSV to HDFS

                                                                                

In [7]:
spark.stop()

# Part 3

In [8]:
"""
This is a skeleton of the PageRank algorithm.
Feel free to use any piece of code in this provided skeleton source file.
To use it, you will need to copy it into your Notebook. 
Feel free to make modifications to template code as you see fit.
However, you are encouraged to implement the algorithm completely on
your own. :-)
"""
import re
import sys
from operator import add
from typing import Iterable, Tuple

from pyspark.resultiterable import ResultIterable
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [9]:
"""Helper function to calculates URL contributions to the rank of other URLs"""
def calculateRankContrib(urls: ResultIterable[str], rank: float) -> Iterable[Tuple[str, float]]:
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


"""Helper function to parses a urls string into urls pair"""
def parseNeighborURLs(urls: str) -> Tuple[str, str]:
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

In [10]:
# Initialize the Spark context
# TODO: You should define a new name for your PySpark PageRank program
spark = (SparkSession.builder.appName("A2:PageRank")
    .master("spark://172.31.47.226:7077")
    .config("spark.executor.memory", "2048M")
    .getOrCreate())

In [11]:
"""Hint: You want to use hash partitioning to optimize the performance of join"""

# Loads in input file
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...
linesRDD = spark.sparkContext.textFile("hdfs://172.31.47.226:9000/web-BerkStan.txt")

# Perform a transformation to define a links RDD by using parseNeighborURLs helper function
linksRDD = linesRDD.map(lambda urls: parseNeighborURLs(urls)).distinct().groupByKey()

# Initialize a ranks RDD
ranksRDD = linksRDD.map(lambda url_neighbors: (url_neighbors[0], 1.0))

## Task 1

In [12]:
# Calculates and updates URL ranks continuously using PageRank algorithm.
# Replace N with a number ranging from 3 to 10
# For A2 you are required to complete 10 iterations
for iteration in range(10):
    # TODO: Implement the PageRank algorithm here
    contributions = linksRDD.join(ranksRDD).flatMap(lambda url_ranks: calculateRankContrib(url_ranks[1][0],url_ranks[1][1]))
    # TODO: Implement the PageRank algorithm here
    rankings = contributions.reduceByKey(add).mapValues(lambda rank: rank * .85 + .15)
    # TODO: Implement the PageRank algorithm here
  

In [13]:
top50 = rankings.sortBy(lambda x:x[1], ascending=False)
result = top50.collect()

                                                                                

In [14]:
result[:50]

[('438238', 11204.5031607055),
 ('210305', 9147.30293301283),
 ('210376', 9145.288510073151),
 ('272919', 6118.621325703636),
 ('462728', 3598.24438766882),
 ('401873', 3498.760681150826),
 ('601656', 3001.397012691197),
 ('319209', 2868.521102053853),
 ('184094', 2822.3999148030284),
 ('768', 2813.674567064774),
 ('927', 2800.7505572428845),
 ('184142', 2796.237895715289),
 ('33', 2795.832629870022),
 ('184332', 2794.7931619074297),
 ('184279', 2794.777075216343),
 ('743', 2794.226231060499),
 ('299040', 2267.0635475952595),
 ('313077', 2260.1234864305825),
 ('284306', 1991.108832922676),
 ('299039', 1990.0520473848444),
 ('477985', 1708.4374936813435),
 ('479054', 1706.7801033799392),
 ('481959', 1706.7212418775919),
 ('477172', 1706.7212418775919),
 ('571448', 1591.835353914967),
 ('571447', 1590.7113612409744),
 ('571451', 1587.665527907641),
 ('570985', 1586.920428701292),
 ('451016', 1460.8509232687802),
 ('316792', 1331.529781463461),
 ('66244', 1312.835402646977),
 ('68949', 13

In [15]:
schema = ["URL", "Rank"]
rows = [Row(*data) for data in result]
df1 = spark.createDataFrame(rows, schema=schema)

In [16]:
df1.write.format("csv").mode("overwrite").save("hdfs://172.31.47.226:9000/result.csv") # Write the new CSV to HDFS

24/02/27 20:35:49 WARN TaskSetManager: Stage 16 contains a task of very large size (2995 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

# Task 2

In [17]:
linesRDD2 = spark.sparkContext.textFile("hdfs://172.31.47.226:9000/web-BerkStan.txt")

# Perform a transformation to define a links RDD by using parseNeighborURLs helper function
linksRDD2 = linesRDD2.map(lambda urls: parseNeighborURLs(urls)).distinct().groupByKey().partitionBy(10)

# Initialize a ranks RDD
ranksRDD2 = linksRDD2.map(lambda url_neighbors: (url_neighbors[0], 1.0))

In [18]:
# Calculates and updates URL ranks continuously using PageRank algorithm.
# Replace N with a number ranging from 3 to 10
# For A2 you are required to complete 10 iterations
for iteration in range(10):
    # TODO: Implement the PageRank algorithm here
    contributions2 = linksRDD2.join(ranksRDD2).flatMap(lambda url_ranks: calculateRankContrib(url_ranks[1][0],url_ranks[1][1]))
    # TODO: Implement the PageRank algorithm here
    rankings2 = contributions2.reduceByKey(add).mapValues(lambda rank: rank * .85 + .15)
    # TODO: Implement the PageRank algorithm here

In [19]:
top50_2 = rankings2.sortBy(lambda x:x[1], ascending=False)
result2 = top50_2.collect()

                                                                                

In [20]:
result2[:50]

[('438238', 11204.503160706425),
 ('210305', 9147.30293301282),
 ('210376', 9145.288510073138),
 ('272919', 6118.621325703219),
 ('462728', 3598.24438766882),
 ('401873', 3498.760681150407),
 ('601656', 3001.3970126912077),
 ('319209', 2868.5211020538586),
 ('184094', 2822.3999148026323),
 ('768', 2813.674567064378),
 ('927', 2800.75055724249),
 ('184142', 2796.2378957148944),
 ('33', 2795.832629869629),
 ('184332', 2794.7931619070346),
 ('184279', 2794.7770752159486),
 ('743', 2794.226231060105),
 ('299040', 2267.063547595239),
 ('313077', 2260.123486430563),
 ('284306', 1991.1088329226423),
 ('299039', 1990.052047384811),
 ('477985', 1708.4374936813442),
 ('479054', 1706.78010337994),
 ('481959', 1706.7212418775928),
 ('477172', 1706.7212418775925),
 ('571448', 1591.835353914962),
 ('571447', 1590.711361240969),
 ('571451', 1587.665527907636),
 ('570985', 1586.9204287012867),
 ('451016', 1460.8509232688077),
 ('316792', 1331.5297814634596),
 ('66244', 1312.835402646961),
 ('68949', 1

## Task 3

In [21]:
linesRDD3 = spark.sparkContext.textFile("hdfs://172.31.47.226:9000/web-BerkStan.txt")

# Perform a transformation to define a links RDD by using parseNeighborURLs helper function
linksRDD3 = linesRDD3.map(lambda urls: parseNeighborURLs(urls)).distinct().groupByKey()

# Initialize a ranks RDD
ranksRDD3 = linksRDD3.map(lambda url_neighbors: (url_neighbors[0], 1.0))

In [22]:
# Calculates and updates URL ranks continuously using PageRank algorithm.
# Replace N with a number ranging from 3 to 10
# For A2 you are required to complete 10 iterations
for iteration in range(10):
    # TODO: Implement the PageRank algorithm here
    contributions3 = linksRDD3.join(ranksRDD3).flatMap(lambda url_ranks: calculateRankContrib(url_ranks[1][0],url_ranks[1][1]))
    # TODO: Implement the PageRank algorithm here
    rankings3 = contributions3.reduceByKey(add).mapValues(lambda rank: rank * .85 + .15)
    # TODO: Implement the PageRank algorithm here

The below cell ranks the URLS by the 2nd item in the tuple (the ranking) in descending order and collects the results

In [23]:
top50_3 = rankings3.sortBy(lambda x:x[1], ascending=False)
result3 = top50_3.collect()

[Stage 36:>                                                         (0 + 2) / 2]

24/02/27 20:45:11 ERROR TaskSchedulerImpl: Lost executor 0 on 172.31.47.25: worker lost
24/02/27 20:45:11 WARN TaskSetManager: Lost task 1.0 in stage 36.0 (TID 158) (172.31.47.25 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: worker lost
24/02/27 20:45:11 WARN TaskSetManager: Lost task 0.0 in stage 36.0 (TID 159) (172.31.47.25 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: worker lost


                                                                                

In [24]:
result3[:50]

[('438238', 11204.503160705473),
 ('210305', 9147.302933012821),
 ('210376', 9145.288510073142),
 ('272919', 6118.621325703652),
 ('462728', 3598.24438766882),
 ('401873', 3498.760681150825),
 ('601656', 3001.397012691201),
 ('319209', 2868.521102053855),
 ('184094', 2822.3999148030257),
 ('768', 2813.674567064771),
 ('927', 2800.7505572428813),
 ('184142', 2796.2378957152855),
 ('33', 2795.8326298700194),
 ('184332', 2794.7931619074266),
 ('184279', 2794.7770752163397),
 ('743', 2794.2262310604956),
 ('299040', 2267.063547595262),
 ('313077', 2260.123486430585),
 ('284306', 1991.1088329226764),
 ('299039', 1990.052047384845),
 ('477985', 1708.437493681344),
 ('479054', 1706.7801033799396),
 ('481959', 1706.721241877592),
 ('477172', 1706.721241877592),
 ('571448', 1591.8353539149634),
 ('571447', 1590.711361240971),
 ('571451', 1587.6655279076374),
 ('570985', 1586.9204287012883),
 ('451016', 1460.8509232687784),
 ('316792', 1331.5297814634614),
 ('66244', 1312.8354026469706),
 ('6894

In [25]:
spark.stop()