## Bonus Question - Connected Components on MapReduce

In [1]:
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from collections import defaultdict
import community as community_louvain

In [8]:
from pyspark import SparkContext, SparkConf
from datetime import datetime

def find_connected_components(flight_data, start_date, end_date):
    # Convert input dates to datetime
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")
    
    # Step 1: Filter data for the date range
    filtered_data = flight_data.filter(lambda x: start_date <= datetime.strptime(x['Fly_date'], "%Y-%m-%d") <= end_date)

    # Step 2: Create the adjacency list (Graph representation)
    edges = filtered_data.flatMap(lambda x: [(x['Origin_airport'], x['Destination_airport']),
                                             (x['Destination_airport'], x['Origin_airport'])]).distinct()

    # Step 3: Initialize each node with itself as the label
    nodes = edges.flatMap(lambda x: [x[0], x[1]]).distinct()
    labels = nodes.map(lambda x: (x, x))

    # Step 4: Iteratively propagate labels until convergence
    for _ in range(10):  # Limiting iterations to 10 as a stopping condition
        # Join edges with labels
        joined = edges.join(labels).map(lambda x: (x[1][0], x[1][1]))
        
        # Reduce by the smallest label in each neighborhood
        new_labels = joined.reduceByKey(lambda x, y: min(x, y))
        
        # Update labels
        labels = labels.join(new_labels).mapValues(lambda x: min(x))

    # Step 5: Group nodes by their labels to find connected components
    connected_components = labels.groupByKey().mapValues(list)
    
    # Number of connected components
    num_components = connected_components.count()
    
    # Size of each component
    component_sizes = connected_components.mapValues(len).collect()
    
    # Find the largest connected component
    largest_component = connected_components.mapValues(len).max(key=lambda x: x[1])
    largest_component_airports = connected_components.filter(lambda x: x[0] == largest_component[0]).flatMap(lambda x: x[1]).collect()

    return num_components, component_sizes, largest_component_airports

# Example Usage
if __name__ == "__main__":
    conf = SparkConf().setAppName("ConnectedComponents").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # Load flight data as an RDD of dictionaries
    flight_data = sc.parallelize([
        {'Origin_airport': 'A', 'Destination_airport': 'B', 'Fly_date': '2024-01-01'},
        {'Origin_airport': 'B', 'Destination_airport': 'C', 'Fly_date': '2024-01-02'},
        {'Origin_airport': 'D', 'Destination_airport': 'E', 'Fly_date': '2024-01-03'},
        # Add more entries as needed
    ])

    # Define the date range
    start_date = "2024-01-01"
    end_date = "2024-01-31"

    # Call the function
    num_components, component_sizes, largest_component_airports = find_connected_components(flight_data, start_date, end_date)
    print("Number of connected components:", num_components)
    print("Size of each component:", component_sizes)
    print("Airports in the largest connected component:", largest_component_airports)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/28 21:18:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Number of connected components: 5
Size of each component: [('B', 1), ('A', 1), ('C', 1), ('E', 1), ('D', 1)]
Airports in the largest connected component: ['A']


                                                                                

In [12]:
import time

start_time = time.time()
num_components, component_sizes, largest_component_airports = find_connected_components(flight_data, start_date, end_date)
custom_time = time.time() - start_time
print(f"Custom Implementation Execution Time: {custom_time} seconds")




Custom Implementation Execution Time: 1628.475653886795 seconds


                                                                                

In [26]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame

def graphframes_connected_components(flight_data, start_date, end_date):
    # Convert input dates to datetime
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    # Filter the flight data by the date range
    filtered_data = flight_data.filter(lambda x: start_date <= datetime.strptime(x['Fly_date'], "%Y-%m-%d") <= end_date)

    # Create the edges DataFrame
    edges_df = filtered_data.map(lambda x: (x['Origin_airport'], x['Destination_airport'])).distinct()
    edges_df = spark.createDataFrame(edges_df, ["src", "dst"])

    # Create the vertices DataFrame
    vertices_df = filtered_data.flatMap(lambda x: [x['Origin_airport'], x['Destination_airport']]).distinct()
    vertices_df = spark.createDataFrame(vertices_df.map(lambda x: (x,)), ["id"])

    # Create the GraphFrame
    graph = GraphFrame(vertices_df, edges_df)

    # Run connected components
    result = graph.connectedComponents()

    return result

# Example Usage with Timing
if __name__ == "__main__":
    spark = SparkSession.builder.appName("ConnectedComponentsGraphFrames").getOrCreate()
    
    # Load flight data as an RDD of dictionaries
    flight_data = sc.parallelize([
        {'Origin_airport': 'A', 'Destination_airport': 'B', 'Fly_date': '2024-01-01'},
        {'Origin_airport': 'B', 'Destination_airport': 'C', 'Fly_date': '2024-01-02'},
        {'Origin_airport': 'D', 'Destination_airport': 'E', 'Fly_date': '2024-01-03'},
        # Add more entries as needed
    ])

    start_date = "2024-01-01"
    end_date = "2024-01-31"

    # Measure execution time for GraphFrames
    start_time = time.time()
    result_df = graphframes_connected_components(flight_data, start_date, end_date)
    graphframes_time = time.time() - start_time
    print(f"GraphFrames Execution Time: {graphframes_time} seconds")


24/12/28 22:20:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Py4JJavaError: An error occurred while calling o1955.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
