In [None]:
import requests
import pytz
import csv
from datetime import datetime, timedelta
from pyspark.sql import Row  

In [None]:
API_KEY = 'insert your api'
BASE_URL = 'https://aeroapi.flightaware.com/aeroapi'

headers = {
    'x-apikey': API_KEY
}

In [None]:
# Define Malaysia's timezone
malaysia_tz = pytz.timezone('Asia/Kuala_Lumpur')

# Get the current time in Malaysia
malaysia_time = datetime.now(malaysia_tz)

# Subtract one and two hours
malaysia_time_minus_one_hour = malaysia_time - timedelta(hours=1)
malaysia_time_minus_two_hour = malaysia_time - timedelta(hours=2)

# Format the time in the desired format
end_time_myt = malaysia_time_minus_one_hour.strftime('%Y-%m-%dT%H:00:00')
start_time_myt = malaysia_time_minus_two_hour.strftime('%Y-%m-%dT%H:00:00')

airport_code = 'WMKK'

print(start_time_myt)
print(end_time_myt)

2024-09-05T09:00:00
2024-09-05T10:00:00


TRANSFORMATION DEPARTURE

In [None]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import Row

def sanitize_filename(filename):
    # Replace problematic characters with underscores
    return filename.replace(':', '_')

def convert_myt_to_utc(myt_time_str):
    myt_time = datetime.strptime(myt_time_str, '%Y-%m-%dT%H:%M:%S')
    utc_time = myt_time - timedelta(hours=8)
    return utc_time.isoformat()

def convert_utc_to_myt(utc_time_str):
    # Handle missing or invalid date strings
    if utc_time_str and utc_time_str != 'N/A':
        try:
            utc_time = datetime.strptime(utc_time_str, '%Y-%m-%dT%H:%M:%SZ')
            myt_time = utc_time + timedelta(hours=8)
            return myt_time.strftime('%Y-%m-%d %H:%M:%S')
        except ValueError:
            return 'Invalid Date'
    else:
        return 'N/A'

def get_recent_departures(airport_code, start_time_myt, end_time_myt):
    start_time_iso = convert_myt_to_utc(start_time_myt) + 'Z'
    end_time_iso = convert_myt_to_utc(end_time_myt) + 'Z'
    
    endpoint = f'{BASE_URL}/airports/{airport_code}/flights/departures'
    params = {
        'start': start_time_iso,
        'end': end_time_iso,
        'max_pages':10
    }
    
    response = requests.get(endpoint, headers=headers, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        return response.text

def convert_to_dataframe_departures(spark, data):
    # Define the schema explicitly
    schema = StructType([
        StructField("flight_id", StringType(), True),
        StructField("flight_number", StringType(), True),
        StructField("aircraft_type", StringType(), True),
        StructField("scheduled_departure_myt", StringType(), True),
        StructField("actual_departure_myt", StringType(), True),
        StructField("origin", StringType(), True),
        StructField("destination", StringType(), True),
        StructField("gate_destination", StringType(), True)
    ])
    
    # Create a list of Row objects with safe checks for None
    rows = [
        Row(
            flight_id=flight.get('ident', 'N/A'),
            flight_number=flight.get('flight_number', 'N/A'),
            aircraft_type=flight.get('aircraft_type', 'N/A'),
            scheduled_departure_myt=convert_utc_to_myt(flight.get('scheduled_off', 'N/A')),
            actual_departure_myt=convert_utc_to_myt(flight.get('actual_off', 'N/A')),
            origin=flight.get('origin', {}).get('name', 'N/A') if flight.get('origin') else 'N/A',
            destination=flight.get('destination', {}).get('name', 'N/A') if flight.get('destination') else 'N/A',
            gate_destination=flight.get('gate_destination', 'N/A')
        )
        for flight in data.get('departures', [])
    ]
    
    # Convert the list of Rows into a DataFrame with the predefined schema
    df = spark.createDataFrame(rows, schema=schema)
    return df

# Fetch recent departures
recent_departures = get_recent_departures(airport_code, start_time_myt, end_time_myt)

# Check if the data is in JSON format
if isinstance(recent_departures, dict):
    df_departures = convert_to_dataframe_departures(spark, recent_departures)
    df_departures.show()  # Display the DataFrame
else:
    print("Error fetching data:", recent_departures)


+---------+-------------+-------------+-----------------------+--------------------+------------------+--------------------+----------------+
|flight_id|flight_number|aircraft_type|scheduled_departure_myt|actual_departure_myt|            origin|         destination|gate_destination|
+---------+-------------+-------------+-----------------------+--------------------+------------------+--------------------+----------------+
|   MAS740|          740|        B738 |    2024-09-05 09:20:00| 2024-09-05 09:58:07|Kuala Lumpur Int'l|        Yangon Int'l|            NULL|
|  MAS2610|         2610|        B38M |    2024-09-05 09:25:00| 2024-09-05 09:56:39|Kuala Lumpur Int'l| Kota Kinabalu Int'l|            NULL|
|  MAS2574|         2574|        B738 |    2024-09-05 09:05:00| 2024-09-05 09:54:44|Kuala Lumpur Int'l|                Miri|            NULL|
|   XAX378|          378|        A333 |    2024-09-05 09:40:00| 2024-09-05 09:54:14|Kuala Lumpur Int'l|Taiwan Taoyuan Int'l|              B5|
|    M

In [None]:
departure_row = df_departures.count()
print(departure_row)

32


SAVE FILES DEPARTURE

In [None]:
# Sanitize the start time for use in filenames and paths (if needed)
sanitized_start_time = sanitize_filename(start_time_myt)

# Define the folder path before writing the file (no need for dynamic subdirectories)
folder_path_departures = '/mnt/raw/departures/'

# Coalesce the DataFrame to a single partition
df_departures_coalesced = df_departures.coalesce(1)

# Append a unique timestamp or identifier to the file name to avoid overwriting
unique_identifier = sanitized_start_time  # You can use the start time or any unique ID

# Write the coalesced DataFrame to a Parquet file directly in the /mnt/raw/departures/ folder
df_departures_coalesced.write.mode('append').parquet(folder_path_departures)

# List the files in the directory after writing the Parquet file
files_departures = dbutils.fs.ls(folder_path_departures)

# Find the part file and rename it to the desired name with the sanitized start time and unique identifier
corrected_file_path_departures = f"{folder_path_departures}departures_{unique_identifier}.parquet"

# Loop through and handle the files
for file in files_departures:
    if file.name.startswith("part-"):
        # Move (rename) the part file to the desired file name
        dbutils.fs.mv(file.path, corrected_file_path_departures)
    elif file.name.startswith("_SUCCESS") or file.name.startswith("_committed") or file.name.startswith("_started"):
        # Remove unwanted system files like _SUCCESS, _committed, _started
        dbutils.fs.rm(file.path)


TRANSFORMATION ARRIVAL

In [None]:
def get_recent_arrivals(airport_code, start_time_myt, end_time_myt):
    end_time_iso = convert_myt_to_utc(end_time_myt) + 'Z'
    start_time_iso = convert_myt_to_utc(start_time_myt) + 'Z'
    
    endpoint = f'{BASE_URL}/airports/{airport_code}/flights/arrivals'
    params = {
        'start': start_time_iso,
        'end': end_time_iso,
        'max_pages':10
    }
    
    response = requests.get(endpoint, headers=headers, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        return response.text

def convert_to_dataframe_arrivals(spark, data):
    # Define the schema explicitly for arrivals
    schema = StructType([
        StructField("flight_id", StringType(), True),
        StructField("flight_number", StringType(), True),
        StructField("aircraft_type", StringType(), True),
        StructField("scheduled_arrival_myt", StringType(), True),
        StructField("actual_arrival_myt", StringType(), True),
        StructField("origin", StringType(), True),
        StructField("destination", StringType(), True),
        StructField("gate_destination", StringType(), True)
    ])
    
    # Create a list of Row objects
    rows = [
        Row(
            flight_id=flight.get('ident', 'N/A'),
            flight_number=flight.get('flight_number', 'N/A'),
            aircraft_type=flight.get('aircraft_type', 'N/A'),
            scheduled_arrival_myt=convert_utc_to_myt(flight.get('scheduled_on', 'N/A')),
            actual_arrival_myt=convert_utc_to_myt(flight.get('actual_on', 'N/A')),
            origin=flight.get('origin', {}).get('name', 'N/A'),
            destination=flight.get('destination', {}).get('name', 'N/A'),
            gate_destination=flight.get('gate_destination', 'N/A')
        )
        for flight in data.get('arrivals', [])
    ]
    
    # Convert the list of Rows into a DataFrame with the predefined schema
    df = spark.createDataFrame(rows, schema=schema)
    return df

# Fetch recent arrivals
recent_arrivals = get_recent_arrivals(airport_code, start_time_myt, end_time_myt)

# Check if the data is in JSON format
if isinstance(recent_arrivals, dict):
    df_arrivals = convert_to_dataframe_arrivals(spark, recent_arrivals)
    df_arrivals.show()  # Display the DataFrame
else:
    print("Error fetching data:", recent_arrivals)

+---------+-------------+-------------+---------------------+-------------------+--------------------+------------------+----------------+
|flight_id|flight_number|aircraft_type|scheduled_arrival_myt| actual_arrival_myt|              origin|       destination|gate_destination|
+---------+-------------+-------------+---------------------+-------------------+--------------------+------------------+----------------+
|   MAS126|          126|        A333 |  2024-09-05 09:44:00|2024-09-05 09:57:31|         Perth Int'l|Kuala Lumpur Int'l|            NULL|
|   AXM702|          702|        A21N |  2024-09-05 10:05:00|2024-09-05 09:57:19|    Singapore Changi|Kuala Lumpur Int'l|            NULL|
|  AXM5205|         5205|        A320 |  2024-09-05 10:20:00|2024-09-05 09:52:10|       Kuching Int'l|Kuala Lumpur Int'l|            NULL|
|   MAU646|          646|        A332 |  2024-09-05 10:00:00|2024-09-05 09:50:49|Sir Seewoosagur R...|Kuala Lumpur Int'l|             C12|
|   AWQ550|          550|  

SAVE FILES ARRIVALS

In [None]:
# Sanitize the start time for use in filenames and paths (if needed)
sanitized_start_time = sanitize_filename(start_time_myt)

# Define the folder path before writing the file (no need for dynamic subdirectories)
folder_path_arrivals = '/mnt/raw/arrivals/'

# Coalesce the DataFrame to a single partition
df_arrivals_coalesced = df_arrivals.coalesce(1)

# Append a unique timestamp or identifier to the file name to avoid overwriting
unique_identifier = sanitized_start_time  # You can use the start time or any unique ID

# Write the coalesced DataFrame to a Parquet file directly in the /mnt/raw/arrivals/ folder
df_arrivals_coalesced.write.mode('append').parquet(folder_path_arrivals)

# List the files in the directory after writing the Parquet file
files_arrivals = dbutils.fs.ls(folder_path_arrivals)

# Find the part file and rename it to the desired name with the sanitized start time and unique identifier
corrected_file_path_arrivals = f"{folder_path_arrivals}arrivals_{unique_identifier}.parquet"

# Loop through and handle the files
for file in files_arrivals:
    if file.name.startswith("part-"):
        # Move (rename) the part file to the desired file name
        dbutils.fs.mv(file.path, corrected_file_path_arrivals)
    elif file.name.startswith("_SUCCESS") or file.name.startswith("_committed") or file.name.startswith("_started"):
        # Remove unwanted system files like _SUCCESS, _committed, _started
        dbutils.fs.rm(file.path)


In [None]:
departure_row = df_departures.count()
arrival_row = df_arrivals.count()
print(arrival_row)
print(departure_row)

14
32
