# Project 3: Big graphs

The objective of this project is to use Spark’s APIs to analyze the flight interconnected data to understand the popularity of the airports and flight patterns.

## Session setup

In [50]:
import pyspark
from delta import configure_spark_with_delta_pip

# Prepare the Spark builder
builder = pyspark.sql.SparkSession.builder.appName("Project_3") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "16g")

spark = configure_spark_with_delta_pip(builder,extra_packages=["graphframes:graphframes:0.8.4-spark3.5-s_2.12"]).getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", spark._sc.defaultParallelism)

#spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # OK for exploration, not great for performance
spark.conf.set("spark.sql.repl.eagerEval.truncate", 500)

In [51]:
import graphframes as gf
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
import json
import numpy as np

## Creating the Graph

In [52]:
# Reading in the data

flight_df = spark.read.csv("input/2009.csv", header=True, inferSchema=True)
display(flight_df)

DataFrame[FL_DATE: date, OP_CARRIER: string, OP_CARRIER_FL_NUM: int, ORIGIN: string, DEST: string, CRS_DEP_TIME: int, DEP_TIME: double, DEP_DELAY: double, TAXI_OUT: double, WHEELS_OFF: double, WHEELS_ON: double, TAXI_IN: double, CRS_ARR_TIME: int, ARR_TIME: double, ARR_DELAY: double, CANCELLED: double, CANCELLATION_CODE: string, DIVERTED: double, CRS_ELAPSED_TIME: double, ACTUAL_ELAPSED_TIME: double, AIR_TIME: double, DISTANCE: double, CARRIER_DELAY: double, WEATHER_DELAY: double, NAS_DELAY: double, SECURITY_DELAY: double, LATE_AIRCRAFT_DELAY: double, Unnamed: 27: string]

In [53]:
# Vertices

flight_vertices = (flight_df
                  .select(F.col("ORIGIN").alias("id"))
                  .union(flight_df
                        .select(F.col("DEST").alias("id")))
                  .distinct()
)

display(flight_vertices)

DataFrame[id: string]

In [54]:
# Edges

flight_edges = (flight_df
               .withColumnRenamed("ORIGIN","src")
               .withColumnRenamed("DEST","dst")
)

display(flight_edges)

DataFrame[FL_DATE: date, OP_CARRIER: string, OP_CARRIER_FL_NUM: int, src: string, dst: string, CRS_DEP_TIME: int, DEP_TIME: double, DEP_DELAY: double, TAXI_OUT: double, WHEELS_OFF: double, WHEELS_ON: double, TAXI_IN: double, CRS_ARR_TIME: int, ARR_TIME: double, ARR_DELAY: double, CANCELLED: double, CANCELLATION_CODE: string, DIVERTED: double, CRS_ELAPSED_TIME: double, ACTUAL_ELAPSED_TIME: double, AIR_TIME: double, DISTANCE: double, CARRIER_DELAY: double, WEATHER_DELAY: double, NAS_DELAY: double, SECURITY_DELAY: double, LATE_AIRCRAFT_DELAY: double, Unnamed: 27: string]

#### GraphFrame

In [55]:
flight_graph = gf.GraphFrame(flight_vertices, flight_edges)

flight_vertices.cache()
flight_edges.cache()

display(flight_graph)

GraphFrame(v:[id: string], e:[src: string, dst: string ... 26 more fields])

## Query 1 (0)

Compute different statistics : in-degree, out-degree and total degree (without existing functions). 

### In-degree

In [56]:
in_degree = flight_graph.edges.groupBy("dst").agg(F.count("*").alias("inDegree"))

in_degree.show()

+---+--------+
|dst|inDegree|
+---+--------+
|IAH|  182088|
|JAX|   28813|
|ABQ|   35577|
|IND|   38198|
|BOS|  110463|
|GRR|   13970|
|MEM|   71721|
|PBI|   25496|
|XNA|   13764|
|LBB|    8004|
|BTV|    6021|
|VPS|    6958|
|SYR|    9330|
|JFK|  119571|
|MBS|    3443|
|SBN|    4527|
|PDX|   52251|
|RDD|    1433|
|LNK|    2765|
|HPN|   10661|
+---+--------+
only showing top 20 rows



In [57]:
# For validation
flight_graph.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|IAH|  182088|
|JAX|   28813|
|ABQ|   35577|
|IND|   38198|
|BOS|  110463|
|GRR|   13970|
|MEM|   71721|
|PBI|   25496|
|XNA|   13764|
|LBB|    8004|
|BTV|    6021|
|VPS|    6958|
|SYR|    9330|
|JFK|  119571|
|MBS|    3443|
|SBN|    4527|
|PDX|   52251|
|RDD|    1433|
|LNK|    2765|
|HPN|   10661|
+---+--------+
only showing top 20 rows



### Out-degree

In [58]:
out_degree = flight_graph.edges.groupBy("src").agg(F.count("*").alias("outDegree"))
out_degree.show()

+---+---------+
|src|outDegree|
+---+---------+
|IAH|   182097|
|JAX|    28810|
|ABQ|    35582|
|IND|    38201|
|GRR|    13973|
|LBB|     8002|
|MEM|    71713|
|BTV|     6028|
|BOS|   110460|
|PBI|    25500|
|XNA|    13755|
|VPS|     6959|
|SYR|     9336|
|JFK|   119574|
|MBS|     3444|
|SBN|     4526|
|PDX|    52242|
|RDD|     1433|
|LNK|     2765|
|HPN|    10657|
+---+---------+
only showing top 20 rows



In [59]:
# For validation
flight_graph.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|IAH|   182097|
|JAX|    28810|
|ABQ|    35582|
|IND|    38201|
|GRR|    13973|
|LBB|     8002|
|MEM|    71713|
|BTV|     6028|
|BOS|   110460|
|PBI|    25500|
|XNA|    13755|
|VPS|     6959|
|SYR|     9336|
|JFK|   119574|
|MBS|     3444|
|SBN|     4526|
|PDX|    52242|
|RDD|     1433|
|LNK|     2765|
|HPN|    10657|
+---+---------+
only showing top 20 rows



### Total degree

In [60]:
# Finding the total degrees
total_degree = flight_graph.edges.select(F.col("src").alias("id")) \
    .union(flight_graph.edges.select(F.col("dst").alias("id"))) \
    .groupBy("id") \
    .agg(F.count("*").alias("degree"))

total_degree.show()

+---+------+
| id|degree|
+---+------+
|IAH|364185|
|JAX| 57623|
|ABQ| 71159|
|IND| 76399|
|GRR| 27943|
|LBB| 16006|
|MEM|143434|
|BTV| 12049|
|BOS|220923|
|PBI| 50996|
|XNA| 27519|
|VPS| 13917|
|SYR| 18666|
|JFK|239145|
|MBS|  6887|
|SBN|  9053|
|PDX|104493|
|RDD|  2866|
|LNK|  5530|
|HPN| 21318|
+---+------+
only showing top 20 rows



In [61]:
# For validation
flight_graph.degrees.show()

+---+------+
| id|degree|
+---+------+
|IAH|364185|
|JAX| 57623|
|ABQ| 71159|
|IND| 76399|
|BOS|220923|
|GRR| 27943|
|LBB| 16006|
|MEM|143434|
|BTV| 12049|
|PBI| 50996|
|XNA| 27519|
|VPS| 13917|
|SYR| 18666|
|JFK|239145|
|MBS|  6887|
|SBN|  9053|
|PDX|104493|
|RDD|  2866|
|LNK|  5530|
|HPN| 21318|
+---+------+
only showing top 20 rows



### Triangle count

We calculate how many times each node appears in by converting the graph into an undirected graph, then finding all triangles in the graph and finally counting how many triangles each node is part of. To find all triangles in the graph we first look for paths a -> b and b -> c and then select triangles where there is an edge c -> a. We sort the nodes of all triangles alphabetically to remove all duplicate triangles.

In [62]:
# Removing all duplicate edges from the graph based on src and dst
flight_edges_distinct = flight_edges.dropDuplicates(['src', 'dst'])

# Making the graph undirected to mimic the built-in funtion
undirected_edges = flight_edges_distinct.selectExpr("src", "dst") \
    .union(flight_edges_distinct.selectExpr("dst as src", "src as dst")).dropDuplicates()

# Finding all a -> b, b -> c pairs 
paths = undirected_edges.alias("e1") \
    .join(flight_edges_distinct.alias("e2"), F.col("e1.dst") == F.col("e2.src")) \
    .select(F.col("e1.src").alias("a"),
            F.col("e1.dst").alias("b"),
            F.col("e2.dst").alias("c"))

# Finding all triangles with c -> a
triangles = paths \
    .join(undirected_edges.alias("e3"), (F.col("e3.src") == F.col("c")) & (F.col("e3.dst") == F.col("a"))) \
    .select("a", "b", "c")

# Sorting the triangle nodes alphabetically to drop duplicates
triangles_sorted = triangles.select(F.array_sort(F.array("a", "b", "c")).alias("triangle")).dropDuplicates()

# Converting all triangles into one column of veritces [a, b, c] [c, d, a] -> a, b, c, c, d, a
triangle_vertices = triangles_sorted.select(F.explode("triangle").alias("id"))

# Counting the number of triangles each airport is part of 
triangle_counts = triangle_vertices.groupBy("id").count()

# Adding nodes that are part of 0 triangles to the result to match the built-in function
result = flight_graph.vertices.select("id") \
    .join(triangle_counts, on="id", how="left") \
    .withColumn("count", F.coalesce(F.col("count"), F.lit(0)))

result.show()

+---+-----+
| id|count|
+---+-----+
|IAH| 1338|
|JAX|  342|
|ABQ|  311|
|IND|  612|
|GRR|   96|
|LBB|   23|
|MEM| 1105|
|BTV|   34|
|BOS|  860|
|PBI|  168|
|XNA|   97|
|VPS|   10|
|SYR|   82|
|JFK|  942|
|MBS|    6|
|SBN|   13|
|PDX|  413|
|RDD|    1|
|LNK|   38|
|HPN|   36|
+---+-----+
only showing top 20 rows



In [63]:
# Comparing our results with the built-in function to validate all results
joined_triangles = flight_graph.triangleCount().alias('a').join(result.alias('b'), on='id')
diff_counts = joined_triangles.filter(F.col('a.count') != F.col('b.count')) # Should be empty if all is correct
print("Differences in the results (should be empty):")
diff_counts.select('id', F.col('a.count').alias('Built-in_count'), F.col('b.count').alias('Custom_count')).show()

Differences in the results (should be empty):
+---+--------------+------------+
| id|Built-in_count|Custom_count|
+---+--------------+------------+
+---+--------------+------------+



## Query 2 (1)
Compute the total number of triangles in the graph.

The query to find the total number of triangles in the graph uses the same logic as the previous query, but it just counts the total number of unique triangles, not node appearances.

In [64]:
# Removing all duplicate edges from the graph based on src and dst
flight_edges_distinct = flight_edges.dropDuplicates(['src', 'dst'])

# Making the graph undirected to mimic the built-in funtion
undirected_edges = flight_edges_distinct.selectExpr("src", "dst") \
    .union(flight_edges_distinct.selectExpr("dst as src", "src as dst")).dropDuplicates()

# Finding all a -> b, b -> c pairs 
paths = undirected_edges.alias("e1") \
    .join(flight_edges_distinct.alias("e2"), F.col("e1.dst") == F.col("e2.src")) \
    .select(F.col("e1.src").alias("a"),
            F.col("e1.dst").alias("b"),
            F.col("e2.dst").alias("c"))

# Finding all triangles with c -> a
triangles = paths \
    .join(undirected_edges.alias("e3"), (F.col("e3.src") == F.col("c")) & (F.col("e3.dst") == F.col("a"))) \
    .select("a", "b", "c")

# Sorting the triangle nodes alphabetically to drop duplicates
triangles_sorted = triangles.select(F.array_sort(F.array("a", "b", "c")).alias("triangle")).dropDuplicates()

# Count the total triangles in the graph
total_triangle_count = triangles_sorted.count()

print(f"Total number of triangles in the flight graph is {total_triangle_count}.")

Total number of triangles in the flight graph is 16015.


To find the total number of of unique triangles in the graph using the built-in function replica, we sum all the individual node triangle counts and divided it by 3 since each triangle is included three times (once for each of its 3 vertices).

In [65]:
# Finding the sum of all triangles in the graph using the triangle count results from q1
total_triangle_count = triangle_counts.agg(F.sum("count")).first()[0] // 3
print(f"Total number of triangles in the flight graph is {total_triangle_count}.")

Total number of triangles in the flight graph is 16015.


In [66]:
# For validation
triangle_count_val = flight_graph.triangleCount().agg(F.sum("count")).first()[0] // 3
print(f"Total number of triangles in the flight graph based on the built-in function is {triangle_count_val}.")

Total number of triangles in the flight graph based on the built-in function is 16015.


## Precomputation for query 3 and 4

In [68]:
flight_vertices_q = (flight_df
                  .select(F.col("ORIGIN").alias("id"))
                  .union(flight_df
                        .select(F.col("DEST").alias("id")))
                  .distinct()
)
flight_edges_q = (flight_df
                .select(F.col('ORIGIN').alias('src'), F.col('DEST').alias('dst'), F.col('DISTANCE'))
                .distinct()
)
flight_graph_q = gf.GraphFrame(flight_vertices_q, flight_edges_q)

N = flight_graph_q.vertices.count()

# lookup table ID:'str' -> idx:int
indexes = {}
i = 0
for node in flight_graph_q.vertices.toLocalIterator():
    indexes[node.id] = i
    i += 1

# Adjacency matrix, distance matrix
adjM = np.zeros(N * N).reshape((N, N))
distM = np.zeros(N * N).reshape((N, N))

for edge in flight_graph_q.edges.toLocalIterator():
    fromIdx = indexes[edge.src] 
    toIdx = indexes[edge.dst]
    adjM[toIdx][fromIdx] = 1
    distM[toIdx][fromIdx] = edge.DISTANCE

## Query 3 (2)
Compute a centrality measure of your choice natively on Spark using Graphframes.

In [69]:
# Closeness Centrality

# BFS function for calculating a measure of closeness centrality from a given node

# Shortest path -> least edges taken
def bfs_edges(vertices, lookup, adjM, v: str):
    lookup = json.loads(lookup) # For mapping vertex names to their indexes
    adjM = json.loads(adjM) # Represents graph's adjacency matrix
    N = len(vertices)
    distances = [float('inf')] * N # initial list of distances (all infinity)
    distances[lookup[v]] = 0 # distance to the start node is 0
    visited = set() # set of visited nodes (tracked by index)
    q = [v] # queue

    # Processes nodes while queue not empty
    while len(q) != 0:
        curr = q.pop(0) # Take the first node
        currIdx = lookup[curr] # Get the index of current node
        
        # Check if the node has already been visited
        if currIdx in visited:
            continue
        visited.add(currIdx) # Mark node as visited

        # Check all other nodes to see if they are directly connected to the current node
        for i in range(N):
            # If there's an edge from current node to node i and node i has not been visited
            if adjM[i][currIdx] == 1 and not i in visited:
                distances[i] = min(distances[i], distances[currIdx] + 1) # If a shorter path is found then update the distance to node i
                q.append(vertices[i])

    return (N - 1)/sum(distances)

# Shortest path -> actual physical shortest path
def bfs_dist(vertices, lookup, distM, v: str):
    lookup = json.loads(lookup) # For mapping vertex names to their indexes
    distM = json.loads(distM) # Represents graph's adjacency matrix
    N = len(vertices)
    distances = [float('inf')] * N # initial list of distances (all infinity)
    distances[lookup[v]] = 0 # distance to the start node is 0
    visited = set() # set of visited nodes (tracked by index)
    q = [v] # queue

    # Processes nodes while queue not empty
    while len(q) != 0:
        curr = q.pop(0) # Take the first node
        currIdx = lookup[curr] # Get the index of current node
        
        # Check if the node has already been visited
        if currIdx in visited:
            continue
        visited.add(currIdx) # Mark node as visited

        # Check all other nodes to see if they are directly connected to the current node
        for i in range(N):
            # If there's an edge from current node to node i and node i has not been visited
            if distM[i][currIdx] != 0 and not i in visited:
                distances[i] = min(distances[i], distances[currIdx] + distM[i][currIdx]) # If a shorter path is found then update the distance to node i
                q.append(vertices[i])

    return (N - 1)/sum(distances)

bfs_edges_udf = F.udf(bfs_edges, FloatType())
bfs_dist_udf = F.udf(bfs_dist, FloatType())

In [70]:
vertexList = [node.id for node in flight_graph_q.vertices.collect()]
centralities = flight_graph_q.vertices.withColumn('centrality_edges', bfs_edges_udf(F.lit(vertexList), F.lit(json.dumps(indexes)), F.lit(json.dumps(adjM.tolist())), 'id'))
centralities = centralities.withColumn('centrality_dist', bfs_dist_udf(F.lit(vertexList), F.lit(json.dumps(indexes)), F.lit(json.dumps(distM.tolist())), 'id'))
centralities.show()

+---+----------------+---------------+
| id|centrality_edges|centrality_dist|
+---+----------------+---------------+
|IAH|       0.5983773|    7.851529E-4|
|JAX|      0.47504026|   6.9578097E-4|
|ABQ|      0.50687283|   7.6872966E-4|
|IND|       0.5139373|    8.692603E-4|
|GRR|      0.47049442|    8.364784E-4|
|LBB|      0.40466392|    7.721947E-4|
|MEM|       0.5662188|   8.7641896E-4|
|BTV|      0.43382353|   6.0750963E-4|
|BOS|      0.53636366|    5.937573E-4|
|PBI|      0.45807454|    6.000716E-4|
|XNA|      0.47580644|    8.764086E-4|
|VPS|      0.42753622|   7.3864305E-4|
|SYR|      0.43768546|    6.703997E-4|
|JFK|       0.5462963|    6.488807E-4|
|MBS|      0.41143653|   8.0058404E-4|
|SBN|      0.43703705|   8.5602776E-4|
|PDX|       0.5305755|     5.75475E-4|
|RDD|      0.36019537|    5.204612E-4|
|LNK|      0.45807454|    8.191714E-4|
|HPN|      0.43255132|    6.062599E-4|
+---+----------------+---------------+
only showing top 20 rows



## Query 4 (3)
Implement the PageRank algorithm natively on Spark using Graphframes.

In [75]:
# Count total number of vertices
N = flight_vertices_q.count()

# Initializing ranks by setting an equal rank to all nodes
ranks = flight_vertices_q.withColumn("rank", F.lit(1.0))
out_degree = flight_graph_q.edges.groupBy("src").agg(F.count("*").alias("outDegree"))

damping_factor = 0.85
num_iters = 10

for i in range(num_iters):
    # Joining edges with current ranks to find how much rank each source node contributes to its destination node
    contributions = flight_edges_q.join(ranks, flight_edges_q.src == ranks.id).join(out_degree, on="src").withColumn("contribution", F.col("rank") / F.col("outDegree")).select("dst", "contribution")
    
    # Sum the contributions for all destination nodes
    sum_contributions = contributions.groupBy("dst").agg(F.sum("contribution").alias("incoming"))

    # Computing new rank values based on incoming contribution score (+ dampening)
    updated_ranks = sum_contributions.withColumn("rank", F.lit((1 - damping_factor)) + damping_factor * F.col("incoming")).select(F.col("dst").alias("id"), "rank")

    # Updating ranks dataframe
    # joining with vetrices to ensure all nodes have a rank value even if they were unaffected by this iteration
    ranks = flight_vertices_q.join(updated_ranks, on="id", how="left_outer").withColumn("rank", F.coalesce(F.col("rank"), F.lit((1 - damping_factor) / N))).select("id", "rank")

In [76]:
result = flight_graph_q.pageRank(resetProbability=0.15, maxIter=10)

In [77]:
result = result.vertices.join(ranks, on='id', how='inner')
result.show()

+---+-------------------+-------------------+
| id|           pagerank|               rank|
+---+-------------------+-------------------+
|IAH|  5.662321605756482|  5.662321605756484|
|JAX| 1.6077915704615784| 1.6077915704615786|
|ABQ| 1.7429124686716673| 1.7429124686716668|
|IND| 1.9850210090574687| 1.9850210090574683|
|GRR|  0.836374678808676| 0.8363746788086759|
|LBB| 0.5372659782130557| 0.5372659782130556|
|MEM|  4.676034097666703|  4.676034097666703|
|BTV| 0.5764612714461971| 0.5764612714461971|
|BOS| 2.8138753782688175| 2.8138753782688166|
|PBI| 1.1467079819343051| 1.1467079819343051|
|XNA|   0.89689132633325| 0.8968913263332497|
|VPS|0.40682553897292073| 0.4068255389729206|
|SYR| 0.7563288719817197| 0.7563288719817195|
|JFK|  3.508955992981467| 3.5089559929814675|
|MBS| 0.3536217424004337|0.35362174240043376|
|SBN|0.41375417295624806|0.41375417295624806|
|PDX| 2.3823714398188938| 2.3823714398188933|
|RDD| 0.2915821901287276| 0.2915821901287276|
|LNK| 0.5806182004896793| 0.580618

## Query 5 (4)
Find the group of the most connected airports.

To find the group of most connected components, we replicated the connectedComponents() function. As preparation we removed all duplicate edges for less processing.

To find all connected components, we first initialized the components so that each node belongs to a separate component. After this, we added all nodes that are reachable from the initial node to respective components. This process is repeated until the results converge (in this specific case, it takes 4 iterations, and the graph is fully connected). Finally, the largest connected component is identified by selecting the component with the largest size.

In [74]:
# Removing all duplicate edges from the graph based on src and dst
flight_edges_distinct = flight_edges.dropDuplicates(['src', 'dst'])

# Creating initial components where each node starts as its own component
components = flight_vertices.select("id").withColumnRenamed("id", "node").withColumn("label", F.col("node"))

# After 4 iterations there is 1 large component (meaning the graph is fully connected)
for i in range(4):
    # Joining the edges with the components to propagate the component labels from neighbors
    neighbors = flight_edges_distinct.join(components, flight_edges_distinct.src == components.node, "left") \
        .select(F.col("dst").alias("node"), "label") \
        .repartition(200, "node")

    # Updating the components using the alpabetically first node as the label for the component
    components = neighbors.groupBy("node") \
        .agg(F.min("label").alias("label")) \
        .repartition(200, "node")

    # Counting the number of connected components after this iteration for tracking
    component_sizes = components.groupBy("label").agg(F.count("*").alias("size"))
    print(f"Iteration {i+1}: {component_sizes.count()} components.")

# Extracting the largest component label (alpabetically first node)
largest_component_label = component_sizes.orderBy(F.desc("size")).limit(1).collect()[0]["label"]
most_connected_airports = components.filter(F.col("label") == largest_component_label).select("node")

print(f"The most connected group contains {most_connected_airports.count()} out of {flight_vertices.count()} airports.")

Iteration 1: 42 components.
Iteration 2: 7 components.
Iteration 3: 2 components.
Iteration 4: 1 components.
The most connected group contains 296 out of 296 airports.
