## Description 

This PySpark application analyzes scheduled aircraft landings to estimate the operational complexity of airspace management in busy airport regions. The system focuses on the top 10 most busy airports (by arrival volume) and treats each as the center of a regional airspace zone with a 200-mile radius.

The final goal is to count of complexity of landings by measureing all possible sequences of landings assuming that the airport regions have atmost 5 runways. Essentially, given an aircraft arriving in the same airspace region within an hour interval, how many distinct landing sequences are possible.


**Do not change the code below.**

In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()


In [13]:
sc = spark.sparkContext

**Edit the following code cell only for Question 2 and 3.**

In [14]:
import csv
from datetime import datetime
from math import radians, cos, sin, asin, sqrt, comb

def parse_csv(path):
    return sc.textFile(path).mapPartitionsWithIndex(
        lambda idx, it: iter(list(it)[1:]) if idx == 0 else it
    ).map(lambda line: next(csv.reader([line])))

def parse_time(ts):
    return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S.%f")

# Haversine distance in miles
def haversine(lat1, lon1, lat2, lon2):
    R = 3956  # radius of Earth in miles
    dlat, dlon = radians(lat2 - lat1), radians(lon2 - lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    return 2 * R * asin(sqrt(a))

# Total number of landing sequence (at most 4 planes can land simultaneously)
def sequence(n):
    if n == 0: return 1
    return sum(comb(n - 1, g - 1) * sequence(n - g) for g in range(1, min(5, n) + 1))

*Do not change the following code cells.*

In [15]:
# Load airports and flights
airports_rdd = parse_csv("/data/airports_data.csv").map(
    lambda row: (row[0], (float(row[3]), float(row[4])))  # airport_code -> (lat, lon)
).cache()

flights_rdd = parse_csv("/data/flights.csv").filter(lambda x : x[9] !='').map(
    lambda row: (row[5], parse_time(row[9]))  # (arrival_airport, actual_arrival)
)


In [16]:
# Step 1: Get top 10 airports by arrival volume
top_airports = flights_rdd.map(lambda x: (x[0], 1)) \
                          .reduceByKey(lambda a, b: a + b) \
                          .takeOrdered(10, key=lambda x: -x[1])
top_airport_codes = set([a[0] for a in top_airports])
top_airport_codes_bc = sc.broadcast(top_airport_codes)

                                                                                

In [17]:
# Step 2: Create (top_airport, [nearby_airports_within_radius])
X = 200  # miles
top_airports_rdd = airports_rdd.filter(lambda x: x[0] in top_airport_codes_bc.value)

# Join each top airport with every airport to compute distance
region_airport_map = top_airports_rdd.cartesian(airports_rdd) \
    .filter(lambda pair: haversine(pair[0][1][0], pair[0][1][1], pair[1][1][0], pair[1][1][1]) <= X) \
    .map(lambda pair: (pair[0][0], pair[1][0]))  # (region_center_airport, nearby_airport)

In [18]:
# Step 3: Invert map to (airport, [region_centers])
airport_to_regions = region_airport_map.map(lambda x: (x[1], x[0])) \
                                       .groupByKey().mapValues(set)

In [19]:
# Step 4: Join flight arrivals with region mapping

flights_by_region = flights_rdd.join(airport_to_regions)

In [20]:
# Step 5: Create hourly keys (region_center, hour) → 1 for each region the arrival belongs to
def assign_to_regions(record):
    airport, (arrival_time, regions) = record
    date = arrival_time.strftime("%Y-%m-%d")
    hour = arrival_time.strftime("%H")
    return [((region, date, hour), 1) for region in regions]

region_hour_counts = flights_by_region.flatMap(assign_to_regions) \
                                      .reduceByKey(lambda x, y: x + y)
# Output: (region_center_airport, date, hour) count_of_landings

In [21]:
from pyspark.rdd import portable_hash

num_partitions = 50
def region_hour_partitioner(key):
    region, data, hour = key
    return portable_hash(region) % num_partitions

# Enforces 50 partitions based on the airport code.
region_hour_counts = region_hour_counts.partitionBy(num_partitions, region_hour_partitioner) 

### Answer the questions below. 

In [22]:
region_hour_counts.sortBy(lambda x: x[1], ascending=False).collect()

                                                                                

[(('DME', '2017-08-10', '13'), 19),
 (('DME', '2017-08-01', '13'), 19),
 (('VKO', '2017-08-10', '13'), 19),
 (('VKO', '2017-08-01', '13'), 19),
 (('SVO', '2017-08-01', '13'), 19),
 (('SVO', '2017-08-10', '13'), 19),
 (('DME', '2017-07-27', '13'), 18),
 (('DME', '2017-07-16', '13'), 18),
 (('DME', '2017-07-18', '13'), 18),
 (('DME', '2017-08-08', '13'), 18),
 (('VKO', '2017-07-18', '13'), 18),
 (('VKO', '2017-07-27', '13'), 18),
 (('VKO', '2017-07-16', '13'), 18),
 (('VKO', '2017-08-08', '13'), 18),
 (('SVO', '2017-08-12', '12'), 18),
 (('SVO', '2017-07-16', '13'), 18),
 (('SVO', '2017-08-08', '13'), 18),
 (('SVO', '2017-07-18', '13'), 18),
 (('SVO', '2017-07-27', '13'), 18),
 (('DME', '2017-07-30', '13'), 17),
 (('DME', '2017-08-13', '13'), 17),
 (('DME', '2017-07-31', '13'), 17),
 (('DME', '2017-08-14', '13'), 17),
 (('DME', '2017-08-05', '11'), 17),
 (('DME', '2017-07-23', '13'), 17),
 (('DME', '2017-07-22', '11'), 17),
 (('DME', '2017-07-20', '13'), 17),
 (('DME', '2017-08-04', '13'

## Question 1.

Run the following cell and identify why the following takes prohibitively long time, or on some machines, never terminates? Provide concrete measurements about tasks, job runtinme, and other metrics to justify your answer. You are also welcome to inspect the output data by writing additional opeartors


In [23]:
# Step 6: Compute Count of Landing Sequences
region_hour_date_seq = region_hour_counts.map(lambda x: (x[0][0], x[0][1], x[1], sequence(x[1])))
# Output: (region_center_airport, date, hour, count_of_landings, landing_sequences)
region_hour_date_seq.collect()

                                                                                

[('KJA', '2017-08-13', 1, 1),
 ('KJA', '2017-07-26', 1, 1),
 ('KJA', '2017-08-13', 2, 2),
 ('KJA', '2017-08-04', 2, 2),
 ('KJA', '2017-07-30', 2, 2),
 ('KJA', '2017-07-28', 2, 2),
 ('KJA', '2017-07-17', 2, 2),
 ('KJA', '2017-07-22', 1, 1),
 ('KJA', '2017-07-29', 2, 2),
 ('KJA', '2017-08-05', 2, 2),
 ('KJA', '2017-08-05', 1, 1),
 ('KJA', '2017-07-17', 1, 1),
 ('KJA', '2017-08-04', 1, 1),
 ('KJA', '2017-08-09', 1, 1),
 ('KJA', '2017-07-29', 1, 1),
 ('KJA', '2017-08-02', 1, 1),
 ('KJA', '2017-08-06', 1, 1),
 ('KJA', '2017-08-12', 3, 5),
 ('KJA', '2017-07-18', 3, 5),
 ('KJA', '2017-07-26', 1, 1),
 ('KJA', '2017-07-23', 1, 1),
 ('KJA', '2017-08-13', 2, 2),
 ('KJA', '2017-07-17', 1, 1),
 ('KJA', '2017-08-14', 3, 5),
 ('KJA', '2017-07-16', 1, 1),
 ('KJA', '2017-08-13', 1, 1),
 ('KJA', '2017-07-28', 1, 1),
 ('KJA', '2017-07-17', 1, 1),
 ('KJA', '2017-08-01', 1, 1),
 ('KJA', '2017-08-04', 1, 1),
 ('KJA', '2017-08-02', 1, 1),
 ('KJA', '2017-08-03', 1, 1),
 ('KJA', '2017-07-22', 2, 2),
 ('KJA', '

**Answer:** 

## Question 2
Often a small number of tasks in a stage are left with resoruce intensive data proccesing. In such cases, if possible, repartition the data can help improve the performance. Currenttly, the RDD ``region_hour_counts`` is partitioned into 50 partitions based on the key `Airport Region`. 

```python
num_partitions = 50
def region_hour_partitioner(key):
    region, date, hour = key
    return portable_hash(region) % num_partitions
```

Change the following cell to implement a different partitioning strategy such that the semantics of the program remain unchanged, but the job's runtime is reduced. You are **not allowed to use more than 50 partitions**. **Justify your changes in detail** by explaining:
- Your observations from the original version,
- What you learned from them,
- What led you to design the solution you chose.

In [25]:
# Edit the following partition stretagy 
from pyspark.rdd import portable_hash

num_partitions = 50 
def region_hour_partitioner(key):
# IMPLEMENT THE PARTITION FUNCTION HERE



region_airport_map = top_airports_rdd.cartesian(airports_rdd) \
    .filter(lambda pair: haversine(pair[0][1][0], pair[0][1][1], pair[1][1][0], pair[1][1][1]) <= X) \
    .map(lambda pair: (pair[0][0], pair[1][0]))
airport_to_regions = region_airport_map.map(lambda x: (x[1], x[0])) \
    .groupByKey().mapValues(set)

flights_rdd.join(airport_to_regions) \
           .flatMap(assign_to_regions) \
           .reduceByKey(lambda x, y: x + y) \
           .partitionBy(num_partitions, region_hour_partitioner) \
           .map(lambda x: (x[0][0], x[0][1], x[1], sequence(x[1]))) \
           .collect()



IndentationError: expected an indented block (2553092628.py, line 10)

## Question 3.

If the input column(s) to a UDF contain a lot of duplicate values, it can be beneficial to improve performance by ensuring that the UDF is only called once per distinct input value, rather than once per row. For example in our previous question, ``haversine`` function might be called mutiple times on the identical set of (lat,long) pairs, causing it to reperform the same measurement again. Your task is to implement *in-memory UDF caching* what is sometimes called memoization. Here is one example

```python 
cache = {}

def square(x):
    if x in cache:
        return cache[x]
    result = x * x
    cache[x] = result
    return result

print(square(4))  # Computes 4*4 = 16 and stores it
print(square(4))  # Returns 16 from cache, doesn't recompute
print(square(5))  # Computes 5*5 = 25 and stores it
```

### Question 3.a 
Implement *in-memory UDF caching*  function ``haversine_cache`` located in the code cell below. Comment the existing implementation.

Rerun the job and report the the performance improvements if any. **Justify your changes in detail** by explaining:
- Your observations from the original version,
- What you learned from them,
- What led you to design the solution you chose.


Hint: Identify the stage where this function executes and inspect the tasks related to that stage. Do not forget to rerun the cell where you have made edits. 

In [None]:
# Implement the haversine_cache function here
def haversine_cache(lat1, lon1, lat2, lon2):
    

region_airport_map = top_airports_rdd.cartesian(airports_rdd) \
    .filter(lambda pair: haversine_cache(pair[0][1][0], pair[0][1][1], pair[1][1][0], pair[1][1][1]) <= X) \
    .map(lambda pair: (pair[0][0], pair[1][0]))
airport_to_regions = region_airport_map.map(lambda x: (x[1], x[0])) \
    .groupByKey().mapValues(set)

flights_rdd.join(airport_to_regions) \
           .flatMap(assign_to_regions) \
           .filter(lambda x : x[0][1] >=12 and x[0][1] <20)\
           .reduceByKey(lambda x, y: x + y) \
           .partitionBy(num_partitions, region_hour_partitioner) \
           .map(lambda x: (x[0][0], x[0][1], x[1], sequence(x[1]))) \
           .collect()

**Answer:** 

### Question 3.b 
Implement *in-memory UDF caching* for function ``sequence_cache`` located in the cell below. Rerun the job below and report the the performance improvements if any. **Justify your changes in detail** by explaining:
- Your observations from the original version,
- What you learned from them,
- What led you to design the solution you chose.

In [None]:
# Implement your sequence Cache function here
def sequence_cache(x):
    
region_airport_map = top_airports_rdd.cartesian(airports_rdd) \
    .filter(lambda pair: haversine(pair[0][1][0], pair[0][1][1], pair[1][1][0], pair[1][1][1]) <= X) \
    .map(lambda pair: (pair[0][0], pair[1][0]))
airport_to_regions = region_airport_map.map(lambda x: (x[1], x[0])) \
    .groupByKey().mapValues(set)

flights_rdd.join(airport_to_regions) \
           .flatMap(assign_to_regions) \
           .reduceByKey(lambda x, y: x + y) \
           .partitionBy(num_partitions, region_hour_partitioner) \
           .map(lambda x: (x[0][0], x[0][1], x[1], sequence_cache(x[1]))) \
           .collect()

**Answer:** 