1) In this task, you are required to use PySpark and the MapReduce paradigm to identify the connected components in a flight network graph. The focus should be on airports rather than cities. As you know, a connected component refers to a group of airports where every pair of airports within the group is connected either directly or indirectly.

The function takes the following inputs:


1.   Flight network
2.   A starting date
3.   An end date


The function outputs:

1.   The number of the connected components during that period
2.   The size of each connectd component
3.   The airports within the largest connected component identified

The first thing to do is the Data Processing. So, after loading the data from the .CSV file, I have to:

*   filter flights based on a specified date range(*start_date* and *end_date*).
*   extract edges representing direct flights between airports.

After that, I have to implement the connected components algorithm using the **MapReduce** paradigm. So the things that I have to do are:


*   Initialize each airport with its own unique ID.
*   Iteratively propagate the smallest component ID across connected airports until convergence.

This will allow me to calculate the outputs of the function, that are:


*   The total number of the connected components.
*   The size of each connected component.
*   Identify the airports in the largest connected component





Function "***preprocess_data***" has these parameters:

*   **csv_file**: Path to the flight data CSV file.
*   **start_date**: Start of the date range (inclusive).
*   **end_date**: End of the date range (inclusive).

And returns a list of tuples representing graph edges (Origin_airport, Destination_airport).

Function "***connected_components_mapreduce***" has this parameter:

*   **flight_edges**: List of tuples representing the edges of the graph.

And returns a dictionary containing the number of components, their sizes, and the largest component's airports.

Function "***map_to_min_component***" has these parameters:

*   **edge**: Tuple representing an edge (src, dest).    
*   **components_dict**: Current component assignments for all airports.

And returns a list of tuples mapping airports to the smallest component ID.

Function "***has_converged***" has these parameters:


*   **prev**: RDD of previous component assignments.
*   **current**: RDD of current component assignments.

And returns a boolean indicating convergence.

Function "***run_connected_components***" has these parameters:

*   **csv_file**: Path to the flight data CSV file.
*   **start_date**: Start of the date range (inclusive).
*   **end_date**: End of the date range (inclusive).

In [1]:
import pandas as pd
from pyspark import SparkContext, SparkConf

def preprocess_data(csv_file, start_date, end_date):

    # Preprocesses the flight data to filter records within the specified date range and prepares edges for graph analysis
    # Load the dataset
    data = pd.read_csv(csv_file)

    # Ensure Fly_date is in datetime format for filtering
    data["Fly_date"] = pd.to_datetime(data["Fly_date"])

    # Filter records within the specified date range
    filtered_data = data[(data["Fly_date"] >= start_date) & (data["Fly_date"] <= end_date)]

    # Select columns relevant for edges and remove duplicates
    edges = filtered_data[["Origin_airport", "Destination_airport"]].dropna().drop_duplicates()

    return edges.values.tolist()  # Convert to list of tuples for further processing

def connected_components_mapreduce(flight_edges):

    # Identifies connected components in the flight network using the MapReduce paradigm
    # Initialize Spark
    conf = SparkConf().setAppName("ConnectedComponents").setMaster("local")
    sc = SparkContext(conf=conf)

    # Convert edge list to RDD
    edges_rdd = sc.parallelize(flight_edges)

    # Initialize each airport with itself as its component
    def initialize_components(edge):
        src, dest = edge
        return [(src, src), (dest, dest)]  # Each airport is its own component initially

    components = edges_rdd.flatMap(initialize_components).reduceByKey(lambda x, y: x)  # Reduce ensures no duplicates

    # Propagate the smallest component ID iteratively
    def map_to_min_component(edge, components_dict):

        # Maps each edge to propagate the smallest component ID to both endpoints
        src, dest = edge
        src_component = components_dict.get(src, src)
        dest_component = components_dict.get(dest, dest)
        min_component = min(src_component, dest_component)
        return [(src, min_component), (dest, min_component)]

    def has_converged(prev, current):

        # Checks if the component assignments have stabilized

        return prev.subtractByKey(current).isEmpty()

    prev_components = components  # Initialize the first iteration
    converged = False  # Convergence flag

    while not converged:

        # Collect current component assignments as a dictionary for quick lookup
        components_dict = prev_components.collectAsMap()

        # Update components by mapping edges to the smallest component ID
        updated_components = edges_rdd.flatMap(lambda edge: map_to_min_component(edge, components_dict))
        current_components = updated_components.reduceByKey(lambda x, y: min(x, y))  # Reduce to keep smallest ID

        # Check for convergence
        converged = has_converged(prev_components, current_components)
        prev_components = current_components

    final_components = prev_components

    # Calculate the size of each connected component
    component_sizes = final_components.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y)

    # Find the largest component by size
    largest_component = component_sizes.max(key=lambda x: x[1])
    largest_component_airports = final_components.filter(lambda x: x[1] == largest_component[0]).map(lambda x: x[0]).collect()

    # Return results
    num_components = component_sizes.count()  # Total number of connected components
    component_sizes_list = component_sizes.collect()  # List of component sizes

    return {
        "num_components":             num_components,
        "component_sizes":            component_sizes_list,
        "largest_component_airports": largest_component_airports
    }

def run_connected_components(csv_file, start_date, end_date):

    # Executes the full process of identifying connected components in the flight network

    # Preprocess the data to extract graph edges
    flight_edges = preprocess_data(csv_file, start_date, end_date)

    # Run the connected components algorithm
    result = connected_components_mapreduce(flight_edges)

    # Output results
    print("Number of connected components:", result["num_components"])
    print("Sizes of each component:", result["component_sizes"])
    print("Airports in the largest component:", result["largest_component_airports"])

## Example of usage

In [2]:
# Example usage
csv_file = "/content/drive/MyDrive/Colab Notebooks/ADM Homeworks/Homework 5/Airports2.csv"  # Path to the dataset
start_date = "1990-01-01"
end_date = "1990-12-31"

run_connected_components(csv_file, start_date, end_date)

Number of connected components: 65
Sizes of each component: [('BLI', 2), ('EUG', 2), ('ABQ', 35), ('ABE', 46), ('ABI', 10), ('EKO', 1), ('ATL', 24), ('BDL', 7), ('ELM', 1), ('AVP', 1), ('ANC', 9), ('HNL', 2), ('AMA', 1), ('ACV', 6), ('ACT', 1), ('BWI', 7), ('AGS', 7), ('CAK', 1), ('BIL', 10), ('BFL', 4), ('ALB', 6), ('ADQ', 2), ('FAR', 1), ('AZO', 4), ('ATW', 4), ('ACY', 2), ('MIA', 2), ('EYW', 1), ('CYS', 1), ('IDA', 1), ('DFW', 3), ('CID', 1), ('EAT', 1), ('BOS', 1), ('CPR', 1), ('DAY', 1), ('CMH', 1), ('BFI', 2), ('FAT', 2), ('CEC', 1), ('GPT', 1), ('AIY', 1), ('FOE', 1), ('FCA', 2), ('ALW', 3), ('CLM', 1), ('BHM', 1), ('CVG', 1), ('BIF', 1), ('EFD', 1), ('FLG', 1), ('AVL', 1), ('CHO', 1), ('GEG', 2), ('GCC', 1), ('FFO', 1), ('FMN', 1), ('EWN', 2), ('BGM', 1), ('FAI', 1), ('SPI', 1), ('BRO', 1), ('ADM', 1), ('BFF', 1), ('NGP', 1)]
Airports in the largest component: ['SEA', 'PIT', 'CVG', 'BOS', 'ATL', 'ORD', 'MDW', 'PHL', 'DTW', 'MSP', 'CLT', 'EWR', 'BWI', 'LGA', 'MDT', 'ROC', 'IND',

2) Compare the execution time and the results of your implementation with those of the GraphFrames package for identifying connected components. If there is any difference in the results, provide an explanation for why that might occur.


First of all, I install the GraphFrames package

In [4]:
!pip install graphframes



The function "***run_graphframes_connected_components***" has this parameter:

*   **flight_edges**: List of tuples representing the edges of the graph.

And returns a dictionary containing the results from GraphFrames.

In [7]:
from pyspark.sql import SparkSession

def run_graphframes_connected_components(flight_edges):

    # Identifies connected components using the GraphFrames package.
    # Check if a SparkSession already exists
    spark = SparkSession.builder \
        .appName("GraphFramesConnectedComponents") \
        .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
        .getOrCreate()

    # Create vertices and edges DataFrame
    airports = list(set([airport for edge in flight_edges for airport in edge]))
    vertices = spark.createDataFrame([(a,) for a in airports], ["id"])
    edges = spark.createDataFrame(flight_edges, ["src", "dst"])

    # Create GraphFrame
    from graphframes import GraphFrame
    g = GraphFrame(vertices, edges)

    # Run connected components
    result = g.connectedComponents()

    # Collect results
    component_sizes = result.groupBy("component").count().collect()
    largest_component = max(component_sizes, key=lambda x: x["count"])
    largest_component_airports = result.filter(col("component") == largest_component["component"]).select("id").collect()

    return {
        "num_components": len(component_sizes),
        "component_sizes": [(row["component"], row["count"]) for row in component_sizes],
        "largest_component_airports": [row["id"] for row in largest_component_airports]
    }

## Compare Execution Times

In [8]:
# Close existing SparkSession to avoid multiple contexts
from pyspark import SparkContext
if SparkContext._active_spark_context:
    SparkContext._active_spark_context.stop()

# Preprocess the data to get the graph edges
flight_edges = preprocess_data(csv_file, start_date, end_date)

# Measure execution time of the custom MapReduce implementation
start_time = time.time()
custom_result = connected_components_mapreduce(flight_edges)
custom_time = time.time() - start_time

# Measure execution time of GraphFrames
start_time = time.time()
graphframes_result = run_graphframes_connected_components(flight_edges)
graphframes_time = time.time() - start_time

# Print results
print("Custom Implementation:")
print(f"Execution Time: {custom_time:.2f} seconds")
print(f"Number of Components: {custom_result['num_components']}")
print(f"Sizes of Components: {custom_result['component_sizes']}")
print(f"Largest Component Airports: {custom_result['largest_component_airports']}")

print("\nGraphFrames Implementation:")
print(f"Execution Time: {graphframes_time:.2f} seconds")
print(f"Number of Components: {graphframes_result['num_components']}")
print(f"Sizes of Components: {graphframes_result['component_sizes']}")
print(f"Largest Component Airports: {graphframes_result['largest_component_airports']}")



Py4JJavaError: An error occurred while calling o347.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
