
- <b>Author:</b> Juan David Escobar Escobar
- <b>Linkedin:</b> <a href="https://www.linkedin.com/in/jdescobar/">https://www.linkedin.com/in/jdescobar/</a>


1. Build a string of flights .i.e. next flights based on a turn of the aircraft
2. Build a dataset to know in every 15 minutes, how many flights coming in and out of an airport in 2 hours before


# Solution

In [0]:
from pyspark.sql.functions import col, to_timestamp, expr, to_date, lit
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import  monotonically_increasing_id


# config variables
_config_variables = {
  'time_interval_min' : 15,
  'time_interval_hours' : 'tms - INTERVAL 2 HOURS',
  'tms_format' : '%Y-%m-%dT%H:%M:%S'
} 

# schema
_flight_schema = StructType([
    StructField("orig", StringType(), True),
    StructField("dest", StringType(), True),
    StructField("id", StringType(), True),
    StructField("actl_dep_lcl_tms", TimestampType(), True),
    StructField("actl_arr_lcl_tms", TimestampType(), True),
    StructField("flight_num", StringType(), True),
    StructField("flights", StringType(), True),
    StructField("acft_regs_cde", StringType(), True),
    StructField("airborne_lcl_tms", TimestampType(), True),
    StructField("landing_lcl_tms", TimestampType(), True),
    StructField("next_flight_id", StringType(), True)
])

# mock data
_flight_data = [
    (
        'YYZ', 'YVR', '1', '2022-12-31T20:36:00', '2022-12-31T22:28:00', '127', '1', '737',
        '2022-12-31T21:02:00', '2022-12-31T22:17:00', ''
    ),
    (
        'YYZ', 'YVR', '2', '2022-12-31T19:39:00', '2022-12-31T21:22:00', '185', '1', '451',
        '2022-12-31T20:05:00', '2022-12-31T21:14:00', None
    ),
    (
        'YYZ', 'YVR', '3', '2022-12-31T18:53:00', '2022-12-31T20:33:00', '123', '1', '843',
        '2022-12-31T19:10:00', '2022-12-31T20:22:00', ''
    ),
    (
        'YYZ', 'YVR', '4', '2022-12-31T17:27:00', '2022-12-31T19:00:00', '121', '1', '747',
        '2022-12-31T17:43:00', '2022-12-31T18:53:00', ''
    ),
    (
        'YYZ', 'YVR', '5', '2022-12-31T16:44:00', '2022-12-31T18:31:00', '119', '1', '464',
        '2022-12-31T16:56:00', '2022-12-31T18:23:00', ''
    ),
    (
        'YYZ', 'YVR', '6', '2022-12-31T14:35:00', '2022-12-31T16:43:00', '113', '1', '743',
        '2022-12-31T15:04:00', '2022-12-31T16:35:00', ''
    ),
    (
        'YYZ', 'YVR', '7', '2022-12-31T13:08:00', '2022-12-31T14:53:00', '111', '1', '462',
        '2022-12-31T13:20:00', '2022-12-31T14:46:00', ''
    ),
    (
        'YYZ', 'YVR', '8', '2022-12-31T11:23:00', '2022-12-31T13:03:00', '105', '1', '735',
        '2022-12-31T11:40:00', '2022-12-31T12:54:00', ''
    ),
    (
        'YYZ', 'YVR', '9', '2022-12-31T10:18:00', '2022-12-31T12:27:00', '107', '1', '457',
        '2022-12-31T10:39:00', '2022-12-31T12:08:00', ''
    ),
    (
        'YYZ', 'YVR', '10', '2022-12-31T08:50:00', '2022-12-31T10:38:00', '103', '1', '451',
        '2022-12-31T09:13:00', '2022-12-31T10:28:00', '16'
    ),
    (
        'YVR', 'YYZ', '12', '2022-12-31T16:02:00', '2022-12-31T23:20:00', '120', '1', '462',
        '2022-12-31T16:15:00', '2022-12-31T23:10:00', ''
    ),
    (
        'YVR', 'YYZ', '13', '2022-12-31T14:58:00', '2022-12-31T22:20:00', '116', '1', '735',
        '2022-12-31T15:16:00', '2022-12-31T22:00:00', ''
    ),
    (
        'YVR', 'YYZ', '14', '2022-12-31T14:02:00', '2022-12-31T21:28:00', '118', '1', '457',
        '2022-12-31T14:13:00', '2022-12-31T21:15:00', ''
    ),
    (
        'YVR', 'YYZ', '15', '2022-12-31T12:47:00', '2022-12-31T19:59:00', '114', '1', '738',
        '2022-12-31T13:05:00', '2022-12-31T19:48:00', ''
    ),
    (
        'YVR', 'YYZ', '16', '2022-12-31T11:55:00', '2022-12-31T19:10:00', '106', '1', '451',
        '2022-12-31T12:10:00', '2022-12-31T18:55:00', '2'
    ),
    (
        'YVR', 'YYZ', '17', '2022-12-31T10:08:00', '2022-12-31T17:27:00', '110', '1', '737',
        '2022-12-31T10:23:00', '2022-12-31T17:15:00', ''
    ),
    (
        'YVR', 'YYZ', '18', '2022-12-31T09:48:00', '2022-12-31T16:58:00', '108', '1', '747',
        '2022-12-31T10:02:00', '2022-12-31T16:50:00', ''
    ),
    (
        'YVR', 'YYZ', '19', '2022-12-31T08:07:00', '2022-12-31T15:19:00', '104', '1', '843',
        '2022-12-31T08:24:00', '2022-12-31T15:10:00', ''
    ),
    (
        'YVR', 'YYZ', '20', '2022-12-31T06:04:00', '2022-12-31T13:24:00', '100', '1', '743',
        '2022-12-31T06:23:00', '2022-12-31T13:16:00', ''
    ),
    (
        'YVR', 'YYZ', '21', '2022-12-31T00:29:00', '2022-12-31T08:27:00', '128', '1', '451',
        '2022-12-31T01:01:00', '2022-12-31T08:11:00', '10'
    )
]

 
def my_udf(actl_arr_lcl_tms, dest):   
  # apply filter and sort the collection.
  filtered_data = [row for row in flights_reference_broadcast_list.value if row[2] > actl_arr_lcl_tms and row[1] == dest]
  filtered_data.sort(key=lambda x: x[2]) # Sort by actl_arr_lcl_tms

  # Initialize next_flight_id as None
  next_flight_id = None

  # Check for rows in filtered_data
  if len(filtered_data) > 0:
      # Gets the first element of filtered_data (the first row).
      first_row = filtered_data[0]
      # Accesses the value of "id" in the first row (position 0)
      next_flight_id = first_row[0]

  return next_flight_id    

def calculate_next_flight(df: DataFrame, acft_regs_cde):
    """
    Calculate next flight columns

    1. from each row you get the value of the column "actl_arr_lcl_tms" and the value of the column "dest".
    2. then search in the "actl_arr_lcl_tms" column for the value closest to the value obtained in the 
       "actl_arr_lcl_tms" column of the previous point.
    3. filter the data where the column "orig" == "dest" and actl_arr_lcl_tms == to the value obtained in point 2.
    4. the value of the "id" column is obtained and assigned to the "next_flight_id" column of the currently iterated row.

    Args:        
        df (DataFrame): The flight DataFrame.

    Returns:
        DataFrame
    """  
    # lookup_udf = udf(my_udf, StringType()) 

    # 1. filter only the necessary data nad get oldest actl_dep_lcl_tms tms
    df_filtered = (df.filter(col('acft_regs_cde') == acft_regs_cde)
                     .select(col('actl_arr_lcl_tms'), col('id'), col('flight_num'))
                     .orderBy("actl_dep_lcl_tms")
                     .first())

    flight_id = df_filtered['id']
    acft_arrival_tsm = df_filtered['actl_arr_lcl_tms']
    flight_num = df_filtered['flight_num']
    df = df.filter(col('actl_dep_lcl_tms') > acft_arrival_tsm)

    # 2. create a reference object only with the necessary data and columns. 
    # Omit this approach 'UDF' is not so efficient and was redundat, please see next steps.
    # # flights_reference_df = df.select("id", "orig", "actl_dep_lcl_tms")
    # # flights_reference_broadcast_list = spark.sparkContext.broadcast(flights_reference_df.collect())

    # 3. calculate columns 'next_flight_id'
    # result_df = df.withColumn("next_flight_id", lookup_udf(df["actl_arr_lcl_tms"], df["dest"]))    
    # display(result_df)

    # 4. print output tring
    flight_chain = f'Start from Flight ID: {flight_id}, Aircraft: {acft_regs_cde}, Flight Num: {flight_num}\n\n'
    next_flight_id = flight_id
    max_iterations = df.count()

    for i in range(max_iterations):
        current_flight = flights_df.filter(col('id') == next_flight_id).first()
        current_acft_arrival_tsm = current_flight['actl_arr_lcl_tms']       
        current_dest = current_flight['dest']

        if current_flight is None:
            break

        df_fileted = flights_df.filter(
            (col('actl_dep_lcl_tms') > current_acft_arrival_tsm) &
            (col('orig') == current_dest)
        ).orderBy("actl_dep_lcl_tms")  

        if df_fileted.count() > 0:
          df_fileted = df_fileted.first()
          next_flight_id = df_fileted['id']

          flight_chain += (
            f"Flight ID: {df_fileted['id']}, Aircraft: {df_fileted['acft_regs_cde']}, "
            f"Flight Num: {df_fileted['flight_num']}\n"
          )
          next_flight_id = current_flight['next_flight_id']

          if next_flight_id is None:
              break

    print(flight_chain)  

def calculate_intervals_in_a_day():
    # Define the duration of each interval in minutes
    interval_duration = _config_variables['time_interval_min']

    # Define the number of minutes in a day
    minutes_in_a_day = 24 * 60  # 24 hours * 60 minutes/hour

    # Calculate the number of 15-minute intervals in a day
    intervals_in_a_day = int(minutes_in_a_day / interval_duration)

    # Return the number of 15-minute intervals in a day
    return intervals_in_a_day

def calculate_time_series_df(flight_dates) -> DataFrame:
  """
  Calculate a time series DataFrame for 15-minute intervals.

  Args:
      flight_dates (DataFrame): DataFrame containing flight dates.

  Returns:
      DataFrame: Time series DataFrame with 15-minute intervals.
  """
  result_data = []

  df_with_index = flight_dates.withColumn("row_index", monotonically_increasing_id())
  count_rows = flight_dates.count()

  for i in range(count_rows):
    current_flight = df_with_index.filter(col('row_index') == lit(i)).first()      
    airport_code = current_flight['airport_code_fd']
    tms = current_flight['timestamp']

    result_data.append((airport_code, tms))
    
    init_range = 0
    end_range = calculate_intervals_in_a_day() - 1

    for x in range(0, end_range):      
      tms = tms + timedelta(minutes=_config_variables['time_interval_min'])
      result_data.append((airport_code, tms))

  time_series_df = spark.createDataFrame(result_data, ["airport_code_fd", "tms"])
  return time_series_df

def get_ds_fifteen_minutes_flights(flights_df) -> None:
  """
  Generate a dataset to count incoming and outgoing flights for every 15-minute 
  interval, two hours before.

  Args:
      flights_df (DataFrame): The flight DataFrame.

  Returns:
      None
  """
  flights_df = flights_df.withColumn("dep_date", to_date(col("actl_dep_lcl_tms")))
  flights_df = flights_df.withColumn("arr_date", to_date(col("actl_arr_lcl_tms")))
  
  arrivals_df = (flights_df.select("dest", "actl_arr_lcl_tms")
                           .distinct()
                           .withColumnRenamed("dest", "airport_code")
                           .withColumn("date", to_date(col("actl_arr_lcl_tms"))))
  
  departures_df = (flights_df.select("orig", "actl_dep_lcl_tms") 
                             .distinct() 
                             .withColumnRenamed("orig", "airport_code") 
                             .withColumn("date", to_date(col("actl_dep_lcl_tms"))))
  
  flight_dates = arrivals_df.union(departures_df).select("airport_code", "date").distinct()
  flight_dates = flight_dates.withColumnRenamed("airport_code", "airport_code_fd")
  flight_dates = flight_dates.withColumn("timestamp", to_timestamp(col("date"), "%Y-%m-%d %H:%M:%S"))   

  time_series_df = calculate_time_series_df(flight_dates)
  time_series_df = time_series_df.withColumn("tms_minus_2_hours", expr(_config_variables['time_interval_hours']))

  # count out
  time_series_df = time_series_df.join(
    departures_df,
    (departures_df.airport_code == time_series_df.airport_code_fd) &
    (departures_df.actl_dep_lcl_tms <= time_series_df.tms) &
    (departures_df.actl_dep_lcl_tms > time_series_df.tms_minus_2_hours),
    "left"
  ).groupBy("airport_code_fd", "tms", "tms_minus_2_hours").agg(F.count("actl_dep_lcl_tms").alias("out"))

  # count in
  time_series_df = time_series_df.join(
    arrivals_df,
    (arrivals_df.airport_code == time_series_df.airport_code_fd) &
    (arrivals_df.actl_arr_lcl_tms <= time_series_df.tms) &
    (arrivals_df.actl_arr_lcl_tms > time_series_df.tms_minus_2_hours),
    "left"
  ).groupBy("airport_code_fd", "tms", "out").agg(F.count("actl_arr_lcl_tms").alias("in"))

  display(time_series_df)


if __name__ == "__main__":
  
    # Format data timestamps
    flight_data = [
        (orig,
        dest, 
        id, 
        datetime.strptime(actl_dep_lcl_tms, _config_variables['tms_format']), 
        datetime.strptime(actl_arr_lcl_tms, _config_variables['tms_format']), 
        flight_num, flights, acft_regs_cde, 
        datetime.strptime(airborne_lcl_tms, _config_variables['tms_format']), 
        datetime.strptime(landing_lcl_tms, _config_variables['tms_format']), 
        next_flight_id)
        for orig, dest, id, actl_dep_lcl_tms, actl_arr_lcl_tms, flight_num, flights, 
            acft_regs_cde, airborne_lcl_tms, landing_lcl_tms, next_flight_id in _flight_data
    ]  
    # Crearte DF sorted by departure
    flights_df = spark.createDataFrame(flight_data, _flight_schema).orderBy('actl_dep_lcl_tms')

    print('TASK 1')
    print('String of flights .i.e. next flights based on a turn of the aircraft: \n')    

    # Calculate column next_flight
    .0
    acft_regs_cde = '451'    
    calculate_next_flight(flights_df, acft_regs_cde)     

    print('\n')
    print('TASK 2')
    print('Dataset to know in every 15 minutes, how many flights coming in and out of an airport in 2 hours before: \n')
    
    # Task 2: get_ds_fifteen_minutes_flights
    get_ds_fifteen_minutes_flights(flights_df)

TASK 1
String of flights .i.e. next flights based on a turn of the aircraft: 

Start from Flight ID: 21, Aircraft: 451, Flight Num: 128

Flight ID: 10, Aircraft: 451, Flight Num: 103
Flight ID: 16, Aircraft: 451, Flight Num: 106
Flight ID: 2, Aircraft: 451, Flight Num: 185



TASK 2
Dataset to know in every 15 minutes, how many flights coming in and out of an airport in 2 hours before: 



airport_code_fd,tms,out,in
YYZ,2022-12-31T00:00:00.000+0000,0,0
YYZ,2022-12-31T00:15:00.000+0000,0,0
YYZ,2022-12-31T00:30:00.000+0000,0,0
YYZ,2022-12-31T00:45:00.000+0000,0,0
YYZ,2022-12-31T01:00:00.000+0000,0,0
YYZ,2022-12-31T01:15:00.000+0000,0,0
YYZ,2022-12-31T01:30:00.000+0000,0,0
YYZ,2022-12-31T01:45:00.000+0000,0,0
YYZ,2022-12-31T02:00:00.000+0000,0,0
YYZ,2022-12-31T02:15:00.000+0000,0,0
