In [None]:
import requests
import pytz
import csv
from datetime import datetime, timedelta
from pyspark.sql import Row  

In [None]:
API_KEY = '***'
BASE_URL = 'https://aeroapi.flightaware.com/aeroapi'

headers = {
    'x-apikey': API_KEY
}

In [None]:
# Define Malaysia's timezone
malaysia_tz = pytz.timezone('Asia/Kuala_Lumpur')

# Get the current time in Malaysia
malaysia_time = datetime.now(malaysia_tz)

# Subtract one and two hours
malaysia_time_minus_one_hour = malaysia_time - timedelta(hours=2)
malaysia_time_minus_two_hour = malaysia_time - timedelta(hours=3)

# Format the time in the desired format
end_time_myt = malaysia_time_minus_one_hour.strftime('%Y-%m-%dT%H:00:00')
start_time_myt = malaysia_time_minus_two_hour.strftime('%Y-%m-%dT%H:00:00')

airport_code = 'WMKK'

TRANSFORMATION DEPARTURE

In [None]:
def sanitize_filename(filename):
    # Replace problematic characters with underscores
    return filename.replace(':', '_')

def convert_myt_to_utc(myt_time_str):
    myt_time = datetime.strptime(myt_time_str, '%Y-%m-%dT%H:%M:%S')
    utc_time = myt_time - timedelta(hours=8)
    return utc_time.isoformat()

def convert_utc_to_myt(utc_time_str):
    utc_time = datetime.strptime(utc_time_str, '%Y-%m-%dT%H:%M:%SZ')
    myt_time = utc_time + timedelta(hours=8)
    return myt_time.strftime('%Y-%m-%d %H:%M:%S')

def get_recent_departures(airport_code, start_time_myt, end_time_myt):
    start_time_iso = convert_myt_to_utc(start_time_myt) + 'Z'
    end_time_iso = convert_myt_to_utc(end_time_myt) + 'Z'
    
    endpoint = f'{BASE_URL}/airports/{airport_code}/flights/departures'
    params = {
        'start': start_time_iso,
        'end': end_time_iso
    }
    
    response = requests.get(endpoint, headers=headers, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        return response.text

def convert_to_dataframe(spark, data):
    # Define the schema explicitly
    schema = StructType([
        StructField("flight_id", StringType(), True),
        StructField("flight_number", StringType(), True),
        StructField("aircraft_type", StringType(), True),
        StructField("scheduled_departure_myt", StringType(), True),
        StructField("actual_departure_myt", StringType(), True),
        StructField("origin", StringType(), True),
        StructField("destination", StringType(), True),
        StructField("gate_destination", StringType(), True)
    ])
    
    # Create a list of Row objects
    rows = [
        Row(
            flight_id=flight.get('ident', 'N/A'),
            flight_number=flight.get('flight_number', 'N/A'),
            aircraft_type=flight.get('aircraft_type', 'N/A'),
            scheduled_departure_myt=convert_utc_to_myt(flight.get('scheduled_off', 'N/A')),
            actual_departure_myt=convert_utc_to_myt(flight.get('actual_off', 'N/A')),
            origin=flight.get('origin', {}).get('name', 'N/A'),
            destination=flight.get('destination', {}).get('name', 'N/A'),
            gate_destination=flight.get('gate_destination', 'N/A')
        )
        for flight in data.get('departures', [])
    ]
    
    # Convert the list of Rows into a DataFrame with the predefined schema
    df = spark.createDataFrame(rows, schema=schema)
    return df

# Fetch recent departures
recent_departures = get_recent_departures(airport_code, start_time_myt, end_time_myt)

# Check if the data is in JSON format
if isinstance(recent_departures, dict):
    df_departures = convert_to_dataframe(spark, recent_departures)
    df_departures.show()  # Display the DataFrame
else:
    print("Error fetching data:", recent_departures)


+---------+-------------+-------------+-----------------------+--------------------+------------------+--------------------+----------------+
|flight_id|flight_number|aircraft_type|scheduled_departure_myt|actual_departure_myt|            origin|         destination|gate_destination|
+---------+-------------+-------------+-----------------------+--------------------+------------------+--------------------+----------------+
|  MAS2634|         2634|        B38M |    2024-09-03 14:50:00| 2024-09-03 14:59:07|Kuala Lumpur Int'l|         RMAF Labuan|            NULL|
|   AXM886|          886|        A320 |    2024-09-03 14:30:00| 2024-09-03 14:56:42|Kuala Lumpur Int'l|Don Muang Int'l (...|            NULL|
|   XAX522|          522|        A333 |    2024-09-03 14:35:00| 2024-09-03 14:53:54|Kuala Lumpur Int'l|Tokyo Int'l (Haneda)|            NULL|
|   TNU674|          674|        A320 |    2024-09-03 14:40:00| 2024-09-03 14:53:51|Kuala Lumpur Int'l|Jakarta-Soekarno-...|            NULL|
|   AX

SAVE FILES DEPARTURE

In [None]:
# Sanitize the start time for use in filenames and paths
sanitized_start_time = sanitize_filename(start_time_myt)

# Define the folder path before writing the file
folder_path_departures = f'/mnt/raw/departures/departures_{sanitized_start_time}/'

# Coalesce the DataFrame to a single partition
df_departures_coalesced = df_departures.coalesce(1)

# Write the coalesced DataFrame to a Parquet file in the specified folder
df_departures_coalesced.write.mode('overwrite').parquet(folder_path_departures)

# List the files in the directory after writing the Parquet file
files_departures = dbutils.fs.ls(folder_path_departures)

# Correct the file path for renaming
corrected_file_path_departures = f"{folder_path_departures}departures_{sanitized_start_time}.parquet"

# Find the part file and rename it
for file in files_departures:
    if file.name.startswith("part-"):
        # Move (rename) the part file to the desired file name
        dbutils.fs.mv(file.path, corrected_file_path_departures)
        break


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-189052836661557>, line 68[0m
[1;32m     66[0m [38;5;66;03m# Check if the data is in JSON format[39;00m
[1;32m     67[0m [38;5;28;01mif[39;00m [38;5;28misinstance[39m(recent_departures, [38;5;28mdict[39m):
[0;32m---> 68[0m     df_departures [38;5;241m=[39m convert_to_dataframe(spark, recent_departures)
[1;32m     69[0m     df_departures[38;5;241m.[39mshow()  [38;5;66;03m# Display the DataFrame[39;00m
[1;32m     70[0m [38;5;28;01melse[39;00m:

File [0;32m<command-189052836661557>, line 45[0m, in [0;36mconvert_to_dataframe[0;34m(spark, data)[0m
[1;32m     33[0m schema [38;5;241m=[39m StructType([
[1;32m     34[0m     StructField([38;5;124m"[39m[38;5;124mflight_id[39m[38;5;124m"[39m, StringType(), [38;5;28;01mTrue[39;00m),
[1;32m     35[0m     StructF

TRANSFORMATION ARRIVAL

In [None]:
def get_recent_arrivals(airport_code, start_time_myt):
    end_time_iso = convert_myt_to_utc(end_time_myt) + 'Z'
    start_time_iso = convert_myt_to_utc(start_time_myt) + 'Z'
    
    endpoint = f'{BASE_URL}/airports/{airport_code}/flights/arrivals'
    params = {
        'start': start_time_iso,
        'end': end_time_iso
    }
    
    response = requests.get(endpoint, headers=headers, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        return response.text

def convert_to_dataframe(spark, data):
    rows = [
        Row(
            flight_id=flight.get('ident', 'N/A'),
            flight_number=flight.get('flight_number', 'N/A'),
            aircraft_type=flight.get('aircraft_type', 'N/A'),
            scheduled_arrival_myt=convert_utc_to_myt(flight.get('scheduled_on', 'N/A')),
            actual_arrival_myt=convert_utc_to_myt(flight.get('actual_on', 'N/A')),
            origin=flight.get('origin', {}).get('name', 'N/A'),
            destination=flight.get('destination', {}).get('name', 'N/A'),
            gate_destination=flight.get('gate_destination', 'N/A')
        )
        for flight in data.get('arrivals', [])
    ]
    
    df = spark.createDataFrame(rows)
    return df

# Fetch recent arrivals
recent_arrivals = get_recent_arrivals(airport_code, start_time_myt)

# Check if the data is in JSON format
if isinstance(recent_arrivals, dict):
    df_arrivals = convert_to_dataframe(spark, recent_arrivals)
    df_arrivals.show()  # Display the DataFrame
else:
    print("Error fetching data:", recent_arrivals)


+---------+-------------+-------------+---------------------+-------------------+--------------------+------------------+----------------+
|flight_id|flight_number|aircraft_type|scheduled_arrival_myt| actual_arrival_myt|              origin|       destination|gate_destination|
+---------+-------------+-------------+---------------------+-------------------+--------------------+------------------+----------------+
|   MAS741|          741|        B738 |  2024-09-03 15:42:00|2024-09-03 15:59:06|        Yangon Int'l|Kuala Lumpur Int'l|             H10|
|  MXD2103|         2103|        B38M |  2024-09-03 16:05:00|2024-09-03 15:55:12|        Penang Int'l|Kuala Lumpur Int'l|              B2|
|   MXD714|          714|        B738 |  2024-09-03 15:25:00|2024-09-03 15:52:20|         Dubai Int'l|Kuala Lumpur Int'l|             C22|
|   AXM139|          139|        A320 |  2024-09-03 15:43:00|2024-09-03 15:50:08|     Hong Kong Int'l|Kuala Lumpur Int'l|              L4|
|    MAS53|           53|  

SAVE FILES ARRIVALS

In [None]:
# Define the folder path before writing the file
folder_path_arrivals = f'/mnt/raw/arrivals/arrivals_{sanitized_start_time}/'

# Coalesce the DataFrame to a single partition
df_arrivals_coalesced = df_arrivals.coalesce(1)

# Write the coalesced DataFrame to a Parquet file in the specified folder
df_arrivals_coalesced.write.mode('overwrite').parquet(folder_path_arrivals)

# List the files in the directory after writing the Parquet file
files_arrivals = dbutils.fs.ls(folder_path_arrivals)

# Correct the file path for renaming
corrected_file_path_arrivals = f"{folder_path_arrivals}arrivals_{sanitized_start_time}.parquet"

# Find the part file and rename it
for file in files_arrivals:
    if file.name.startswith("part-"):
        # Move (rename) the part file to the desired file name
        dbutils.fs.mv(file.path, corrected_file_path_arrivals)
        break