### Part 2
In this part, you will implement a simple Spark application. We have provided some sample data collected at this link (using wget). Download the file to your home directory of vm1.).

In [1]:
import findspark as fs
fs.init('/home/ubuntu/spark-3.3.1-bin-hadoop3')
fs.find()

'/home/ubuntu/spark-3.3.1-bin-hadoop3'

In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder.appName("DS5110")
            .master("spark://172.31.75.157:7077")
            .config("spark.executor.memory", "1024M")
            .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/27 10:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("hdfs://172.31.75.157:9000/export.csv", inferSchema="true", header="true")

                                                                                

In [4]:
df.select("battery_level").show()

+-------------+
|battery_level|
+-------------+
|            8|
|            7|
|            2|
|            6|
|            4|
|            3|
|            3|
|            0|
|            3|
|            7|
|            3|
|            0|
|            6|
|            1|
|            9|
|            4|
|            0|
|            4|
|            9|
|            7|
+-------------+
only showing top 20 rows



You then need to sort the data firstly by the country code alphabetically (the third column ccr2) then by the timestamp (the last column).

In [5]:
new_df = df.orderBy("cca2", "timestamp")

In [6]:
new_df.show()

+-------------+---------+----+----+--------------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|                  cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+--------------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            5|     1217|  AE| ARE|United Arab Emirates|      501|device-mac-501e4O...|      48|  213.42.16.154|    24.0|yellow|     54.0|Celsius|  16|1458444054343|
|            0|      915|  AR| ARG|           Argentina|      227|meter-gauge-2273p...|      34|  200.71.230.81|   -34.6| green|   -58.38|Celsius|  15|1458444054251|
|            1|     1189|  AR| ARG|           Argentina|      319|meter-gauge-319Y3...|      54| 200.71.236.145|   -34.6|yellow|   -58.38|Celsius|  25|1458444054287|
|   

                                                                                

In [7]:
new_df.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/new_df_hdfs")

                                                                                

### Part 3
In this part, you will implement the PageRank algorithm (§2.1) (or the Wikipedia version), which is an algorithm used by search engines like Google to evaluate the quality of links to a webpage. The algorithm can be summarized as follows:

1. Set initial rank of each page to be 1.
2. On each iteration, each page p contributes to its outgoing neighbors a value of rank(p)/(# of outgoing neighbors of p).
3. Update each page’s rank to be 0.15 + 0.85 * (sum of contributions).
4. Go to next iteration.

**Task 1**
Write a PySpark application that implements the PageRank algorithm. Your PageRank application should output the following two results: 1) print the first 50 rows with the highest ranks; 2) save the computed results as a Spark DF to HDFS as an HDFS csv file.

In [8]:
import re
from pyspark.sql import SparkSession
from typing import Iterable, Tuple
from pyspark.resultiterable import ResultIterable
import time

def calculateRankContrib(nodes: ResultIterable[str], rank: float) -> Iterable[Tuple[str, float]]:
    # number of neighboring nodes the specified node has
    num_neighbors = len(nodes)
    for each_node in nodes:
	# a tuple, element 1==the node
	# element 2==contribution
	# contribution is the node's current rank/number of neighbors the node has
        yield (each_node, rank / num_neighbors)

# takes a line from RDD (in this case two nodes)
# and turns into a tuple: Tuple
# using whitespace as the separator: split
# ex: 324     43798 --> (324, 43798)
def getNeighborNodes(nodes: str) -> Tuple[str, str]:
    parts = re.split(r'\s+', nodes)
    return parts[0], parts[1]

# get data
linesRDD = spark.sparkContext.textFile("hdfs://172.31.75.157:9000/web-BerkStan.txt")
# clean it up a little
linesRDD = (
    linesRDD
    .zipWithIndex()
    .filter(lambda x: x[1] >= 4)
    .map(lambda x: x[0])
)

# Number of iterations
N = 10

                                                                                

In [9]:
%%time
# applies the func. to each line of RDD
# for each line, get a tuple of (current node, neighbor node): lambda
# make that into key, value pair: map
# drop dups: distinct
# group by key (current node) so that each unique node (key) has group of neighbor nodes (value): groupby
nodesRDD = linesRDD.map(lambda nodes: getNeighborNodes(nodes)).distinct().groupByKey()
# each line of nodes==(node, iterable(neighbors))

# Initialize a ranks RDD
# for each tuple of nodes:
# take each node and set rank==1
ranksRDD = nodesRDD.map(lambda key: (key[0], 1.0))
# each line of rank==(node, 1)

# Calculates and updates node ranks continuously using PageRank algorithm
for iteration in range(N):
    # combine nodes with ranks==>(node, (iterable(neighbors), rank)): join
    # flatMap allows access to each key-val to apply lambda
    # where x=(node, (iterable(neighbors), rank))
    contributions = nodesRDD.join(ranksRDD).flatMap(lambda x: calculateRankContrib(x[1][0], x[1][1]))
    # each line of contrib==(node, contribution of node)
    
    # Update ranks based on contributions
    # sum(all contribs) for each node: reduce
    # mapValues allows output of lambda to be used as input for next lambda
    ranksRDD = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x)
    # each line of rank==(node, new rank)

CPU times: user 172 ms, sys: 12.8 ms, total: 185 ms
Wall time: 535 ms


In [10]:
%%time
ranksDF = ranksRDD.toDF()
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/ranksDF_pt1")



CPU times: user 670 ms, sys: 114 ms, total: 784 ms
Wall time: 13min 23s


                                                                                

In [11]:
%%time
# top 50 nodes by rank
top_50 = ranksRDD.takeOrdered(50, key=lambda x: -x[1])
# Print the top 50
for node, rank in top_50:
    print(f"Node: {node}, Rank: {rank}")



Node: 272919, Rank: 6531.324623752465
Node: 438238, Rank: 4335.32315856444
Node: 571448, Rank: 2383.8976074118887
Node: 601656, Rank: 2195.3940755967283
Node: 316792, Rank: 1855.6908757901508
Node: 319209, Rank: 1632.8193684975686
Node: 184094, Rank: 1532.28423744834
Node: 571447, Rank: 1492.9301630938783
Node: 401873, Rank: 1436.1600933469288
Node: 66244, Rank: 1261.578395867334
Node: 68949, Rank: 1260.791942134913
Node: 284306, Rank: 1257.2475650644853
Node: 68948, Rank: 1251.172353645922
Node: 96070, Rank: 1235.298540597625
Node: 77284, Rank: 1235.298540597625
Node: 68946, Rank: 1235.298540597625
Node: 95551, Rank: 1235.298540597625
Node: 66909, Rank: 1235.2985405976249
Node: 95552, Rank: 1235.2985405976247
Node: 86238, Rank: 1235.2985405976247
Node: 86239, Rank: 1235.2985405976247
Node: 86237, Rank: 1235.2985405976247
Node: 68947, Rank: 1235.2985405976247
Node: 768, Rank: 1225.5975665113074
Node: 927, Rank: 1117.8383051141836
Node: 210376, Rank: 920.6701252803678
Node: 95527, Rank:

                                                                                

**Task 2**
In order to achieve high parallelism, Spark will split the data into smaller chunks called partitions, which are distributed across different nodes in the cluster. Partitions can be changed in several ways. For example, any shuffle operation on an RDD (e.g., join()) will result in a change in partitions (customizable via user’s configuration). In addition, one can also decide how to partition data when creating/configuring RDDs (hint: e.g., you can use the function partitionBy()). For this task, add appropriate custom RDD partitioning and see what changes. For the computed result: your PageRank application should print the first 50 rows with the highest ranks.

In [12]:
%%time
# same as before but use partitionBy
# applies the func. to each line of RDD
# for each line, get a tuple of (current node, neighbor node): lambda
# make that into key, value pair: map
# drop dups: distinct
# group by key (current node) so that each unique node (key) has group of neighbor nodes (value): groupby
nodesRDD = linesRDD.map(lambda nodes: getNeighborNodes(nodes)).distinct().groupByKey().partitionBy(4)
# each line of nodes==(node, iterable(neighbors))

# Initialize a ranks RDD
# for each tuple of nodes:
# take each node and set rank==1
ranksRDD = nodesRDD.map(lambda key: (key[0], 1.0))
# each line of rank==(node, 1)

# Calculates and updates node ranks continuously using PageRank algorithm
for iteration in range(N):
    # combine nodes with ranks==>(node, (iterable(neighbors), rank)): join
    # flatMap allows access to each key-val to apply lambda
    # where x=(node, (iterable(neighbors), rank))
    contributions = nodesRDD.join(ranksRDD).flatMap(lambda x: calculateRankContrib(x[1][0], x[1][1]))
    # each line of contrib==(node, contribution of node)
    
    # Update ranks based on contributions
    # sum(all contribs) for each node: reduce
    # mapValues allows output of lambda to be used as input for next lambda
    ranksRDD = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x)
    # each line of rank==(node, new rank)

CPU times: user 155 ms, sys: 8.86 ms, total: 164 ms
Wall time: 476 ms


In [13]:
%%time
ranksDF = ranksRDD.toDF()
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/ranksDF_pt2")

                                                                                

CPU times: user 519 ms, sys: 134 ms, total: 653 ms
Wall time: 8min 43s


In [14]:
%%time
# top 50 nodes by rank
top_50 = ranksRDD.takeOrdered(50, key=lambda x: -x[1])
# Print the top 50
for node, rank in top_50:
    print(f"Node: {node}, Rank: {rank}")



Node: 272919, Rank: 6531.324623752434
Node: 438238, Rank: 4335.323158564441
Node: 571448, Rank: 2383.8976074118855
Node: 601656, Rank: 2195.394075596729
Node: 316792, Rank: 1855.6908757901422
Node: 319209, Rank: 1632.819368497569
Node: 184094, Rank: 1532.284237448333
Node: 571447, Rank: 1492.930163093877
Node: 401873, Rank: 1436.1600933469256
Node: 66244, Rank: 1261.578395867336
Node: 68949, Rank: 1260.7919421349152
Node: 284306, Rank: 1257.2475650644835
Node: 68948, Rank: 1251.1723536459244
Node: 95552, Rank: 1235.2985405976272
Node: 77284, Rank: 1235.2985405976272
Node: 86237, Rank: 1235.2985405976272
Node: 95551, Rank: 1235.2985405976272
Node: 96070, Rank: 1235.2985405976272
Node: 86238, Rank: 1235.2985405976272
Node: 68946, Rank: 1235.2985405976272
Node: 86239, Rank: 1235.2985405976272
Node: 66909, Rank: 1235.2985405976272
Node: 68947, Rank: 1235.298540597627
Node: 768, Rank: 1225.5975665113017
Node: 927, Rank: 1117.8383051141802
Node: 210376, Rank: 920.6701252803681
Node: 95527, R

                                                                                

In [15]:
%%time
# same as before but use partitionBy
# applies the func. to each line of RDD
# for each line, get a tuple of (current node, neighbor node): lambda
# make that into key, value pair: map
# drop dups: distinct
# group by key (current node) so that each unique node (key) has group of neighbor nodes (value): groupby
nodesRDD = linesRDD.map(lambda nodes: getNeighborNodes(nodes)).distinct().groupByKey().partitionBy(8)
# each line of nodes==(node, iterable(neighbors))

# Initialize a ranks RDD
# for each tuple of nodes:
# take each node and set rank==1
ranksRDD = nodesRDD.map(lambda key: (key[0], 1.0))
# each line of rank==(node, 1)

# Calculates and updates node ranks continuously using PageRank algorithm
for iteration in range(N):
    # combine nodes with ranks==>(node, (iterable(neighbors), rank)): join
    # flatMap allows access to each key-val to apply lambda
    # where x=(node, (iterable(neighbors), rank))
    contributions = nodesRDD.join(ranksRDD).flatMap(lambda x: calculateRankContrib(x[1][0], x[1][1]))
    # each line of contrib==(node, contribution of node)
    
    # Update ranks based on contributions
    # sum(all contribs) for each node: reduce
    # mapValues allows output of lambda to be used as input for next lambda
    ranksRDD = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x)
    # each line of rank==(node, new rank)

CPU times: user 112 ms, sys: 21.3 ms, total: 134 ms
Wall time: 357 ms


In [16]:
%%time
ranksDF = ranksRDD.toDF()
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/ranksDF_pt2.2")

                                                                                

CPU times: user 582 ms, sys: 142 ms, total: 723 ms
Wall time: 6min 39s


In [17]:
%%time
# top 50 nodes by rank
top_50 = ranksRDD.takeOrdered(50, key=lambda x: -x[1])
# Print the top 50
for node, rank in top_50:
    print(f"Node: {node}, Rank: {rank}")



Node: 272919, Rank: 6531.324623752432
Node: 438238, Rank: 4335.3231585644335
Node: 571448, Rank: 2383.8976074118877
Node: 601656, Rank: 2195.394075596731
Node: 316792, Rank: 1855.6908757901422
Node: 319209, Rank: 1632.8193684975693
Node: 184094, Rank: 1532.284237448322
Node: 571447, Rank: 1492.9301630938774
Node: 401873, Rank: 1436.1600933469194
Node: 66244, Rank: 1261.5783958673335
Node: 68949, Rank: 1260.791942134913
Node: 284306, Rank: 1257.2475650644835
Node: 68948, Rank: 1251.172353645922
Node: 86239, Rank: 1235.298540597625
Node: 95551, Rank: 1235.298540597625
Node: 68946, Rank: 1235.298540597625
Node: 66909, Rank: 1235.2985405976249
Node: 95552, Rank: 1235.2985405976247
Node: 77284, Rank: 1235.2985405976247
Node: 86237, Rank: 1235.2985405976247
Node: 68947, Rank: 1235.2985405976247
Node: 96070, Rank: 1235.2985405976247
Node: 86238, Rank: 1235.2985405976247
Node: 768, Rank: 1225.5975665112949
Node: 927, Rank: 1117.8383051141748
Node: 210376, Rank: 920.6701252803691
Node: 95527, R

                                                                                

In [18]:
%%time
# same as before but use partitionBy
# applies the func. to each line of RDD
# for each line, get a tuple of (current node, neighbor node): lambda
# make that into key, value pair: map
# drop dups: distinct
# group by key (current node) so that each unique node (key) has group of neighbor nodes (value): groupby
nodesRDD = linesRDD.map(lambda nodes: getNeighborNodes(nodes)).distinct().groupByKey().partitionBy(4)
# each line of nodes==(node, iterable(neighbors))

# Initialize a ranks RDD
# for each tuple of nodes:
# take each node and set rank==1
ranksRDD = nodesRDD.map(lambda key: (key[0], 1.0)).partitionBy(4)
# each line of rank==(node, 1)

# Calculates and updates node ranks continuously using PageRank algorithm
for iteration in range(N):
    # combine nodes with ranks==>(node, (iterable(neighbors), rank)): join
    # flatMap allows access to each key-val to apply lambda
    # where x=(node, (iterable(neighbors), rank))
    contributions = nodesRDD.join(ranksRDD).flatMap(lambda x: calculateRankContrib(x[1][0], x[1][1]))
    # each line of contrib==(node, contribution of node)
    
    # Update ranks based on contributions
    # sum(all contribs) for each node: reduce
    # mapValues allows output of lambda to be used as input for next lambda
    ranksRDD = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x).partitionBy(4)
    # each line of rank==(node, new rank)

CPU times: user 95.1 ms, sys: 16.4 ms, total: 112 ms
Wall time: 322 ms


In [19]:
%%time
ranksDF = ranksRDD.toDF()
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/ranksDF_pt2.3")



CPU times: user 101 ms, sys: 32.3 ms, total: 133 ms
Wall time: 4min 38s


                                                                                

In [20]:
%%time
# top 50 nodes by rank
top_50 = ranksRDD.takeOrdered(50, key=lambda x: -x[1])
# Print the top 50
for node, rank in top_50:
    print(f"Node: {node}, Rank: {rank}")

[Stage 266:>                                                        (0 + 4) / 4]

Node: 272919, Rank: 6531.324623752454
Node: 438238, Rank: 4335.32315856443
Node: 571448, Rank: 2383.897607411891
Node: 601656, Rank: 2195.3940755967305
Node: 316792, Rank: 1855.6908757901538
Node: 319209, Rank: 1632.8193684975708
Node: 184094, Rank: 1532.2842374483453
Node: 571447, Rank: 1492.930163093879
Node: 401873, Rank: 1436.1600933469308
Node: 66244, Rank: 1261.5783958673362
Node: 68949, Rank: 1260.7919421349154
Node: 284306, Rank: 1257.2475650644856
Node: 68948, Rank: 1251.1723536459224
Node: 86237, Rank: 1235.2985405976276
Node: 95552, Rank: 1235.2985405976276
Node: 86239, Rank: 1235.2985405976276
Node: 95551, Rank: 1235.2985405976274
Node: 68946, Rank: 1235.2985405976272
Node: 68947, Rank: 1235.2985405976272
Node: 77284, Rank: 1235.2985405976272
Node: 86238, Rank: 1235.298540597627
Node: 66909, Rank: 1235.298540597627
Node: 96070, Rank: 1235.2985405976256
Node: 768, Rank: 1225.5975665113103
Node: 927, Rank: 1117.8383051141866
Node: 210376, Rank: 920.670125280368
Node: 95527, R

                                                                                

**Task 3**
Kill a Worker process and see the changes. You should trigger the failure to a selected worker VM when the application reaches anywhere between 25% to 75% of its lifetime (hint: use the Spark Jobs web interface to track the detailed job execution progress):

From a shell, clear the memory cache using sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches" on vm2;

In your shell, kill the Worker process on vm2: To do so, use jps to get the process ID (PID) of the Spark Worker on vm2 and then use the command kill -9 <Worker_PID> to kill the Spark Worker process.

For the computed result: your PageRank application should print the first 50 rows with the highest ranks.

In [21]:
%%time
# same as before but use partitionBy
# applies the func. to each line of RDD
# for each line, get a tuple of (current node, neighbor node): lambda
# make that into key, value pair: map
# drop dups: distinct
# group by key (current node) so that each unique node (key) has group of neighbor nodes (value): groupby
nodesRDD = linesRDD.map(lambda nodes: getNeighborNodes(nodes)).distinct().groupByKey()
# each line of nodes==(node, iterable(neighbors))

# Initialize a ranks RDD
# for each tuple of nodes:
# take each node and set rank==1
ranksRDD = nodesRDD.map(lambda key: (key[0], 1.0))
# each line of rank==(node, 1)

# Calculates and updates node ranks continuously using PageRank algorithm
for iteration in range(N):
    # combine nodes with ranks==>(node, (iterable(neighbors), rank)): join
    # flatMap allows access to each key-val to apply lambda
    # where x=(node, (iterable(neighbors), rank))
    contributions = nodesRDD.join(ranksRDD).flatMap(lambda x: calculateRankContrib(x[1][0], x[1][1]))
    # each line of contrib==(node, contribution of node)
    
    # Update ranks based on contributions
    # sum(all contribs) for each node: reduce
    # mapValues allows output of lambda to be used as input for next lambda
    ranksRDD = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x)
    # each line of rank==(node, new rank)

CPU times: user 166 ms, sys: 14 ms, total: 180 ms
Wall time: 529 ms


In [22]:
%%time
ranksDF = ranksRDD.toDF()
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/ranksDF_pt3")



24/02/27 10:58:39 ERROR TaskSchedulerImpl: Lost executor 0 on 172.31.72.60: worker lost




24/02/27 10:59:04 WARN TaskSetManager: Lost task 10.1 in stage 283.0 (TID 2377) (172.31.75.157 executor 1): FetchFailed(null, shuffleId=89, mapIndex=-1, mapId=-1, reduceId=8, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 89 partition 8
	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1705)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1652)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10$adapted(MapOutputTracker.scala:1651)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1651)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1294)
	at org.apache.spark.MapOutputTrackerWorker.ge



CPU times: user 484 ms, sys: 85.4 ms, total: 570 ms
Wall time: 15min 18s


                                                                                

In [23]:
%%time
# top 50 nodes by rank
top_50 = ranksRDD.takeOrdered(50, key=lambda x: -x[1])
# Print the top 50
for node, rank in top_50:
    print(f"Node: {node}, Rank: {rank}")



Node: 272919, Rank: 6531.324623752465
Node: 438238, Rank: 4335.323158564438
Node: 571448, Rank: 2383.8976074118887
Node: 601656, Rank: 2195.394075596728
Node: 316792, Rank: 1855.690875790152
Node: 319209, Rank: 1632.8193684975686
Node: 184094, Rank: 1532.28423744834
Node: 571447, Rank: 1492.9301630938787
Node: 401873, Rank: 1436.1600933469285
Node: 66244, Rank: 1261.5783958673337
Node: 68949, Rank: 1260.791942134913
Node: 284306, Rank: 1257.2475650644851
Node: 68948, Rank: 1251.1723536459222
Node: 96070, Rank: 1235.298540597625
Node: 86239, Rank: 1235.298540597625
Node: 86237, Rank: 1235.298540597625
Node: 95551, Rank: 1235.298540597625
Node: 95552, Rank: 1235.2985405976249
Node: 86238, Rank: 1235.2985405976249
Node: 77284, Rank: 1235.2985405976249
Node: 68946, Rank: 1235.2985405976249
Node: 66909, Rank: 1235.2985405976249
Node: 68947, Rank: 1235.2985405976249
Node: 768, Rank: 1225.5975665113076
Node: 927, Rank: 1117.8383051141843
Node: 210376, Rank: 920.6701252803678
Node: 95527, Rank



In [24]:
%%time
# same as before but use partitionBy
# applies the func. to each line of RDD
# for each line, get a tuple of (current node, neighbor node): lambda
# make that into key, value pair: map
# drop dups: distinct
# group by key (current node) so that each unique node (key) has group of neighbor nodes (value): groupby
nodesRDD = linesRDD.map(lambda nodes: getNeighborNodes(nodes)).distinct().groupByKey().partitionBy(4)
# each line of nodes==(node, iterable(neighbors))

# Initialize a ranks RDD
# for each tuple of nodes:
# take each node and set rank==1
ranksRDD = nodesRDD.map(lambda key: (key[0], 1.0)).partitionBy(4)
# each line of rank==(node, 1)

# Calculates and updates node ranks continuously using PageRank algorithm
for iteration in range(N):
    # combine nodes with ranks==>(node, (iterable(neighbors), rank)): join
    # flatMap allows access to each key-val to apply lambda
    # where x=(node, (iterable(neighbors), rank))
    contributions = nodesRDD.join(ranksRDD).flatMap(lambda x: calculateRankContrib(x[1][0], x[1][1]))
    # each line of contrib==(node, contribution of node)
    
    # Update ranks based on contributions
    # sum(all contribs) for each node: reduce
    # mapValues allows output of lambda to be used as input for next lambda
    ranksRDD = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x).partitionBy(4)
    # each line of rank==(node, new rank)

CPU times: user 110 ms, sys: 32.3 ms, total: 143 ms
Wall time: 325 ms


In [25]:
%%time
ranksDF = ranksRDD.toDF()
ranksDF.write.format("csv").mode("overwrite").save("hdfs://172.31.75.157:9000/ranksDF_pt3.2")



24/02/27 11:19:37 ERROR TaskSchedulerImpl: Lost executor 2 on 172.31.72.60: worker lost
24/02/27 11:19:37 WARN TaskSetManager: Lost task 1.0 in stage 346.0 (TID 2657) (172.31.72.60 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: worker lost
24/02/27 11:19:37 WARN TaskSetManager: Lost task 0.0 in stage 346.0 (TID 2656) (172.31.72.60 executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: worker lost
24/02/27 11:19:37 WARN TaskSetManager: Lost task 0.1 in stage 346.0 (TID 2658) (172.31.75.157 executor 1): FetchFailed(null, shuffleId=116, mapIndex=-1, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 116 partition 0
	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1705)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1652)
	at org.apache.spark.MapOutputTrac



CPU times: user 222 ms, sys: 53.1 ms, total: 275 ms
Wall time: 11min 54s


                                                                                

In [26]:
%%time
# top 50 nodes by rank
top_50 = ranksRDD.takeOrdered(50, key=lambda x: -x[1])
# Print the top 50
for node, rank in top_50:
    print(f"Node: {node}, Rank: {rank}")



Node: 272919, Rank: 6531.324623752456
Node: 438238, Rank: 4335.323158564431
Node: 571448, Rank: 2383.8976074118887
Node: 601656, Rank: 2195.3940755967324
Node: 316792, Rank: 1855.6908757901538
Node: 319209, Rank: 1632.8193684975706
Node: 184094, Rank: 1532.284237448342
Node: 571447, Rank: 1492.9301630938805
Node: 401873, Rank: 1436.1600933469283
Node: 66244, Rank: 1261.5783958673321
Node: 68949, Rank: 1260.7919421349113
Node: 284306, Rank: 1257.2475650644844
Node: 68948, Rank: 1251.17235364592
Node: 86239, Rank: 1235.298540597624
Node: 86238, Rank: 1235.2985405976237
Node: 95551, Rank: 1235.2985405976233
Node: 86237, Rank: 1235.298540597623
Node: 68946, Rank: 1235.298540597623
Node: 66909, Rank: 1235.298540597623
Node: 77284, Rank: 1235.298540597623
Node: 96070, Rank: 1235.298540597623
Node: 68947, Rank: 1235.2985405976228
Node: 95552, Rank: 1235.2985405976228
Node: 768, Rank: 1225.5975665113071
Node: 927, Rank: 1117.8383051141832
Node: 210376, Rank: 920.670125280368
Node: 95527, Rank:

