In [None]:
!pip install python-dotenv
!pip install pyspark
!pip install gdown

import os
from itertools import permutations
from dotenv import load_dotenv
from pyspark import RDD, SparkContext
from pyspark.sql import DataFrame, SparkSession

### install dependency ###
# pip install python-dotenv
# pip install pyspark # make sure you have jdk installed
#####################################

### please update your relative path while running your code ###
temp_airline_textfile2 = r"./test.txt"
temp_airline_textfile = r"./flights_data.txt"
# temp_airline_csvfile = r"./Combined_Flights_2021.csv"
!gdown --id 1JYOFdBSnKQcB24tnQY0HzwsJB4nf2Eek -O Combined_Flights_2021.csv
# Update file paths
temp_airline_csvfile = r"/content/Combined_Flights_2021.csv"  # Path to the downloaded file
default_spark_context = "local[*]"  # only update if you need
#######################################

### please don't update these lines ###
load_dotenv()
airline_textfile = os.getenv("AIRLINE_TXT_FILE", temp_airline_textfile)
airline_csvfile = os.getenv("AIRLINE_CSV_FILE", temp_airline_csvfile)
spark_context = os.getenv("SPARK_CONTEXT", default_spark_context)
#######################################


#######################################


def co_occurring_airline_pairs_by_origin(flights_data: RDD) -> RDD:
    """
    Takes an RDD that represents the contents of the flights_data from a text file.
    Performs a series of MapReduce operations via PySpark to calculate
    the number of co-occurring airlines with the same origin airports operating on the same date,
    determine count of such occurrences pairwise.
    Returns the results as an RDD sorted by the airline pairs alphabetically ascending
    (by first and then second value in the pair) with the counts in descending order.

    :param flights_data: RDD object of the contents of flights_data
    :return: RDD of pairs of airline and the number of occurrences
    Example output: [((Airline_A, Airline_B), 4),
                     ((Airline_A, Airline_C), 2),
                     ((Airline_B, Airline_C), 2)]
    """

    # Step 1: Parse the input data
    # Input: RDD of lines (strings)
    # Output: RDD[((date, origin), airline)]
    parsed_data = flights_data.map(
        lambda line: line.strip().split(",")  # Split by commas
    ).map(
        lambda fields: ((fields[0].strip(), fields[2].strip()), fields[1].strip())
    )  # Extract ((date, origin), airline)

    # Step 2: Group airlines by (date, origin)
    # Output: RDD[((date, origin), [airline1, airline2, ...])]
    grouped_airlines = parsed_data.groupByKey().mapValues(list)

    # Step 3: Generate airline pairs
    # Output: RDD[((airline1, airline2), count_per_group)]
    airline_pairs = grouped_airlines.flatMap(
        lambda x: [
            ((min(a, b), max(a, b)), 1)
            for a, b in permutations(sorted(set(x[1])), 2)  # Create unique airline pairs
            if a != b
        ]
    )

    # Step 4: Count occurrences of each pair across groups
    # Output: RDD[((airline1, airline2), total_count)]
    pair_counts = airline_pairs.reduceByKey(lambda x, y: x + y)

    # Step 5: Sort results
    # Sort by pair alphabetically, and by count in descending order
    sorted_pairs = pair_counts.sortBy(
        lambda x: (-x[1], x[0])
    )  # Sort by count descending, then pair alphabetically

    return sorted_pairs


def air_flights_most_canceled_flights(flights: DataFrame) -> str:
    """
    Finds the airline with the most canceled flights in January 2021.

    :param flights: Spark DataFrame containing the flight data
    :return: The name of the airline with the most canceled flights in January 2021
    """
    # Filter flights for January 2021 and where Cancelled == 1
    jan_canceled_flights = flights.filter((flights.Month == 1) & (flights.Cancelled == 1))

    # Group by Airline and count the number of canceled flights
    canceled_counts = jan_canceled_flights.groupBy("Airline").count()

    # Find the airline with the maximum number of cancellations
    most_canceled_airline = canceled_counts.orderBy(canceled_counts["count"].desc()).first()

    # Return the name of the airline
    return most_canceled_airline["Airline"]


def air_flights_diverted_flights(flights: DataFrame) -> int:
    """
    Takes the flight data as a DataFrame and calculates the number of flights that were diverted
    between 1st-30th November 2021.

    :param flights: Spark DataFrame of the flights CSV file.
    :return: The number of diverted flights as an integer.
    """
    # Filter flights for November 2021 where flights were diverted
    diverted_flights = flights.filter(
        (flights.Month == 11) &  # Month is November
        (flights.DayofMonth >= 1) &  # Day of the month is 1 or later
        (flights.DayofMonth <= 30) &  # Day of the month is 30 or earlier
        (flights.Diverted == 1)  # Flight is diverted
    )

    # Count the number of diverted flights
    num_diverted_flights = diverted_flights.count()

    return num_diverted_flights





def air_flights_avg_airtime(flights: DataFrame) -> float:
    """
    Takes the flight data as a DataFrame and calculates the average airtime of flights
    from Los Angeles, CA to New York, NY.

    :param flights: Spark DataFrame of the flights CSV file.
    :return: The average airtime as a float number.
    """
    # Filter flights where Origin is Los Angeles, CA and Dest is New York, NY
    filtered_flights = flights.filter(
        (flights.OriginCityName == "Los Angeles, CA") &
        (flights.DestCityName == "New York, NY")
    )

    # Ensure AirTime is numeric and calculate the average airtime
    avg_airtime = filtered_flights.agg({"AirTime": "avg"}).first()[0]

    # Return the average airtime or 0.0 if no flights match
    return float(avg_airtime) if avg_airtime else 0.0




def air_flights_missing_departure_time(flights: DataFrame) -> int:
    """
    Takes the flight data as a DataFrame and finds the number of unique days where the departure time (DepTime) is missing.

    :param flights: Spark DataFrame of the flights CSV file.
    :return: The number of unique days as an integer where DepTime is missing.
    """
    # Filter rows where DepTime is null or missing
    missing_dep_time_flights = flights.filter(flights.DepTime.isNull())

    # Select unique FlightDate values and count them
    unique_days_with_missing_dep_time = missing_dep_time_flights.select("FlightDate").distinct().count()

    return unique_days_with_missing_dep_time





def main():
    # initialize SparkContext and SparkSession
    sc = SparkContext(spark_context)
    spark = SparkSession.builder.getOrCreate()

    print("########################## Problem 1 ########################")
    # problem 1: co-occurring operating flights with Spark and MapReduce
    # read the file
    flights_data = sc.textFile(airline_textfile)
    sorted_airline_pairs = co_occurring_airline_pairs_by_origin(flights_data)
    sorted_airline_pairs.persist()
    for pair, count in sorted_airline_pairs.take(10):
       print(f"{pair}: {count}")

    print("########################## Problem 2 ########################")
    # problem 2: PySpark DataFrame operations
    # read the file
    flights = spark.read.csv(airline_csvfile, header=True, inferSchema=True)
    print(
        "Q1:",
        air_flights_most_canceled_flights(flights),
        "had the most canceled flights in January 2021.",
    )

    print(
        "Q2:",
        air_flights_diverted_flights(flights),
        "flights were diverted between the period of 1st-30th November 2021.",
    )

    print(
        "Q3:",
        air_flights_avg_airtime(flights),
        "is the average airtime for flights that were flying from "
        "Los Angeles, CA to New York, NY",
    )

    print(
        "Q4:",
        air_flights_missing_departure_time(flights),
        "unique dates where departure time (DepTime) was not recorded.",
    )


if __name__ == "__main__":
    main()


Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1
Downloading...
From (original): https://drive.google.com/uc?id=1JYOFdBSnKQcB24tnQY0HzwsJB4nf2Eek
From (redirected): https://drive.google.com/uc?id=1JYOFdBSnKQcB24tnQY0HzwsJB4nf2Eek&confirm=t&uuid=5c14a589-77c8-440f-af40-df480747f796
To: /content/Combined_Flights_2021.csv
100% 2.21G/2.21G [00:14<00:00, 152MB/s]
########################## Problem 1 ########################


Py4JJavaError: An error occurred while calling o27.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/content/flights_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Input path does not exist: file:/content/flights_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [2]:
!pip install python-dotenv
!pip install pyspark
!pip install gdown

import os
from itertools import permutations
from dotenv import load_dotenv
from pyspark import RDD, SparkContext
from pyspark.sql import DataFrame, SparkSession

### install dependency ###
# pip install python-dotenv
# pip install pyspark # make sure you have jdk installed
#####################################

### please update your relative path while running your code ###
temp_airline_textfile2 = r"./test.txt"
temp_airline_textfile = r"./flights_data.txt"
# temp_airline_csvfile = r"./Combined_Flights_2021.csv"
!gdown --id 1JYOFdBSnKQcB24tnQY0HzwsJB4nf2Eek -O Combined_Flights_2021.csv
# Update file paths
temp_airline_csvfile = r"/content/Combined_Flights_2021.csv"  # Path to the downloaded file
default_spark_context = "local[*]"  # only update if you need
#######################################

### please don't update these lines ###
load_dotenv()
airline_textfile = os.getenv("AIRLINE_TXT_FILE", temp_airline_textfile)
airline_csvfile = os.getenv("AIRLINE_CSV_FILE", temp_airline_csvfile)
spark_context = os.getenv("SPARK_CONTEXT", default_spark_context)
#######################################


#######################################


def co_occurring_airline_pairs_by_origin(flights_data: RDD) -> RDD:
    """
    Takes an RDD that represents the contents of the flights_data from a text file.
    Performs a series of MapReduce operations via PySpark to calculate
    the number of co-occurring airlines with the same origin airports operating on the same date,
    determine count of such occurrences pairwise.
    Returns the results as an RDD sorted by the airline pairs alphabetically ascending
    (by first and then second value in the pair) with the counts in descending order.

    :param flights_data: RDD object of the contents of flights_data
    :return: RDD of pairs of airline and the number of occurrences
    Example output: [((Airline_A, Airline_B), 4),
                     ((Airline_A, Airline_C), 2),
                     ((Airline_B, Airline_C), 2)]
    """

    # Step 1: Parse the input data
    # Input: RDD of lines (strings)
    # Output: RDD[((date, origin), airline)]
    parsed_data = flights_data.map(
        lambda line: line.strip().split(",")  # Split by commas
    ).map(
        lambda fields: ((fields[0].strip(), fields[2].strip()), fields[1].strip())
    )  # Extract ((date, origin), airline)

    # Step 2: Group airlines by (date, origin)
    # Output: RDD[((date, origin), [airline1, airline2, ...])]
    grouped_airlines = parsed_data.groupByKey().mapValues(list)

    # Step 3: Generate airline pairs
    # Output: RDD[((airline1, airline2), count_per_group)]
    airline_pairs = grouped_airlines.flatMap(
        lambda x: [
            ((min(a, b), max(a, b)), 1)
            for a, b in permutations(sorted(set(x[1])), 2)  # Create unique airline pairs
            if a != b
        ]
    )

    # Step 4: Count occurrences of each pair across groups
    # Output: RDD[((airline1, airline2), total_count)]
    pair_counts = airline_pairs.reduceByKey(lambda x, y: x + y)

    # Step 5: Sort results
    # Sort by pair alphabetically, and by count in descending order
    sorted_pairs = pair_counts.sortBy(
        lambda x: (-x[1], x[0])
    )  # Sort by count descending, then pair alphabetically

    return sorted_pairs


def air_flights_most_canceled_flights(flights: DataFrame) -> str:
    """
    Finds the airline with the most canceled flights in January 2021.

    :param flights: Spark DataFrame containing the flight data
    :return: The name of the airline with the most canceled flights in January 2021
    """
    # Filter flights for January 2021 and where Cancelled == 1
    jan_canceled_flights = flights.filter((flights.Month == 1) & (flights.Cancelled == 1))

    # Group by Airline and count the number of canceled flights
    canceled_counts = jan_canceled_flights.groupBy("Airline").count()

    # Find the airline with the maximum number of cancellations
    most_canceled_airline = canceled_counts.orderBy(canceled_counts["count"].desc()).first()

    # Return the name of the airline
    return most_canceled_airline["Airline"]


def air_flights_diverted_flights(flights: DataFrame) -> int:
    """
    Takes the flight data as a DataFrame and calculates the number of flights that were diverted
    between 1st-30th November 2021.

    :param flights: Spark DataFrame of the flights CSV file.
    :return: The number of diverted flights as an integer.
    """
    # Filter flights for November 2021 where flights were diverted
    diverted_flights = flights.filter(
        (flights.Month == 11) &  # Month is November
        (flights.DayofMonth >= 1) &  # Day of the month is 1 or later
        (flights.DayofMonth <= 30) &  # Day of the month is 30 or earlier
        (flights.Diverted == 1)  # Flight is diverted
    )

    # Count the number of diverted flights
    num_diverted_flights = diverted_flights.count()

    return num_diverted_flights





def air_flights_avg_airtime(flights: DataFrame) -> float:
    """
    Takes the flight data as a DataFrame and calculates the average airtime of flights
    from Los Angeles, CA to New York, NY.

    :param flights: Spark DataFrame of the flights CSV file.
    :return: The average airtime as a float number.
    """
    # Filter flights where Origin is Los Angeles, CA and Dest is New York, NY
    filtered_flights = flights.filter(
        (flights.OriginCityName == "Los Angeles, CA") &
        (flights.DestCityName == "New York, NY")
    )

    # Ensure AirTime is numeric and calculate the average airtime
    avg_airtime = filtered_flights.agg({"AirTime": "avg"}).first()[0]

    # Return the average airtime or 0.0 if no flights match
    return float(avg_airtime) if avg_airtime else 0.0




def air_flights_missing_departure_time(flights: DataFrame) -> int:
    """
    Takes the flight data as a DataFrame and finds the number of unique days where the departure time (DepTime) is missing.

    :param flights: Spark DataFrame of the flights CSV file.
    :return: The number of unique days as an integer where DepTime is missing.
    """
    # Filter rows where DepTime is null or missing
    missing_dep_time_flights = flights.filter(flights.DepTime.isNull())

    # Select unique FlightDate values and count them
    unique_days_with_missing_dep_time = missing_dep_time_flights.select("FlightDate").distinct().count()

    return unique_days_with_missing_dep_time





def main():
    # initialize SparkContext and SparkSession
    sc = SparkContext(spark_context)
    spark = SparkSession.builder.getOrCreate()

    print("########################## Problem 1 ########################")
    # problem 1: co-occurring operating flights with Spark and MapReduce
    # read the file
    flights_data = sc.textFile(airline_textfile)
    sorted_airline_pairs = co_occurring_airline_pairs_by_origin(flights_data)
    sorted_airline_pairs.persist()
    for pair, count in sorted_airline_pairs.take(10):
       print(f"{pair}: {count}")

    print("########################## Problem 2 ########################")
    # problem 2: PySpark DataFrame operations
    # read the file
    flights = spark.read.csv(airline_csvfile, header=True, inferSchema=True)
    print(
        "Q1:",
        air_flights_most_canceled_flights(flights),
        "had the most canceled flights in January 2021.",
    )

    print(
        "Q2:",
        air_flights_diverted_flights(flights),
        "flights were diverted between the period of 1st-30th November 2021.",
    )

    print(
        "Q3:",
        air_flights_avg_airtime(flights),
        "is the average airtime for flights that were flying from "
        "Los Angeles, CA to New York, NY",
    )

    print(
        "Q4:",
        air_flights_missing_departure_time(flights),
        "unique dates where departure time (DepTime) was not recorded.",
    )


if __name__ == "__main__":
    main()


Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1
Downloading...
From (original): https://drive.google.com/uc?id=1JYOFdBSnKQcB24tnQY0HzwsJB4nf2Eek
From (redirected): https://drive.google.com/uc?id=1JYOFdBSnKQcB24tnQY0HzwsJB4nf2Eek&confirm=t&uuid=5c14a589-77c8-440f-af40-df480747f796
To: /content/Combined_Flights_2021.csv
100% 2.21G/2.21G [00:14<00:00, 152MB/s]
########################## Problem 1 ########################


Py4JJavaError: An error occurred while calling o27.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/content/flights_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Input path does not exist: file:/content/flights_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more
