
### This notebook will; 

#### 1. Create the JSON files for both (daily) arrivals and departures & store these files in a DBFS location
#### 2. Read the JSON files and ingest into 'ADLS Ingestion Layer' as Delta Lake, partitioning by date each time

Note: The %run command is used to get variables/ functions that are common for both ingest and transform notebooks

In [None]:
%run "./FlightData-commonconfigs"

In [None]:
import requests
import random
import json

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps

In [None]:
####################################### ENVIRONMENT VARIABLES #######################################

# Secrets from Azure Key Vault

####################################### (JSON) FILES CREATION #######################################

url_first_part = dbutils.secrets.get(scope='FlightData-scope', key='url-first-part') 
url_last_part = dbutils.secrets.get(scope='FlightData-scope', key='url-last-part')
user_agent_1 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-1')
user_agent_2 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-2')
user_agent_3 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-3')
user_agent_4 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-4')
user_agent_5 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-5')
user_agent_6 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-6')
user_agent_7 = dbutils.secrets.get(scope='FlightData-scope', key='user-agent-7')

var_1_arrivals = dbutils.secrets.get(scope='FlightData-scope', key='var-1-arrivals') 
var_3_arrivals = dbutils.secrets.get(scope='FlightData-scope', key='var-3-arrivals')

var_1_departures = dbutils.secrets.get(scope='FlightData-scope', key='var-1-departures') 
var_3_departures = dbutils.secrets.get(scope='FlightData-scope', key='var-3-departures')

In [None]:
# Function to rotate user agents
def header_user_agent():
    # List of user-agents to rotate
    user_agents = [
        user_agent_1,
        user_agent_2,
        user_agent_3,
        user_agent_4,
        user_agent_5,
        user_agent_6,
        user_agent_7
    ]
    # User Agent request header - using random to rotate user agents
    headers = {"User-Agent": random.choice(user_agents)}

    return headers

In [None]:
####################################### (JSON) FILES CREATION #######################################


# Function to create json files for both arrivals and departures - by date
def file_creation(var1, var2, var3):

    # Filter for only yesterday's flights
    # Important to set range of epochs to "Europe/Malta" timezone since time data
    # from source is in this timezone
    epoch_low = int(
            ps.to_datetime(date.today() - timedelta(days=1))
            .tz_localize("Europe/Malta")
            .timestamp()
        )
    epoch_high = int(
        ps.to_datetime(date.today()).tz_localize("Europe/Malta").timestamp()
    )

    # Empty container - This list will contain all (appended) data
    container = []

    for i in range(1, 3):
        # URL containing arrivals & departures
        flight_data_url = f"{url_first_part}=-{i}&{url_last_part}"
        # get request and set flight_data type to json
        headers = header_user_agent()
        request = requests.get(url=flight_data_url, headers=headers)
        flight_data = request.json()

        # Flight data
        flights = flight_data["result"]["response"]["airport"]["pluginData"][
            "schedule"
        ][var2]["data"]

        for flight in flights:
            # First check that flight scheduled time is within range - if not, go to next iteration
            scheduled_time = flight["flight"]["time"]["scheduled"][var1]

            if scheduled_time < epoch_low or scheduled_time > epoch_high:
                continue

            # Flight number
            if flight["flight"]["identification"]["number"] is None:
                flight_no = None
            else:
                flight_no = flight["flight"]["identification"]["number"]["default"]
            # Airport city
            airport_city = flight["flight"]["airport"][var3]["position"]["region"][
                "city"
            ]
            # Airport code
            airport_code = flight["flight"]["airport"][var3]["code"]["iata"]
            # Airline
            if flight["flight"]["airline"] is None:
                airline = None
            else:
                airline = flight["flight"]["airline"]["name"]
            # Aircraft Type
            if flight["flight"]["aircraft"] is None:
                aircraft_type = None
            else:
                aircraft_type = flight["flight"]["aircraft"]["model"]["text"]
            # Aircraft Registration
            if flight["flight"]["aircraft"] is None:
                aircraft_registration = None
            else:
                aircraft_registration = flight["flight"]["aircraft"]["registration"]
            # Actual time arrived/departed
            actual_time = flight["flight"]["time"]["real"][var1]

            data =  {
                "scheduled_time": scheduled_time,
                "flight_no": flight_no,
                "airport_city": airport_city,
                "airport_code": airport_code,
                "airline": airline,
                "aircraft_type": aircraft_type,
                "aircraft_registration": aircraft_registration,
                "actual_time": actual_time,
            }

            # Appending all flight data to container variable
            container.append(data)

    # Writing daily flight data to json files for both arrivals and departures 
    # The JSON files are stored in Databricks dbfs FileStore folder - for previous date
    with open(f"/dbfs/FileStore/flight_data_{var2}_{date.today() - timedelta(days=1)}.json", "w") as json_file:
        json.dump(container,json_file)

In [None]:
####################################### INGESTION #######################################

def ingestion(var2_arrivals,var2_departures,storage_account_name,storage_container_name):

    # Defining the schema of the dataframe
    schema = StructType([
            StructField("scheduled_time", IntegerType(), True), 
            StructField("flight_no", StringType(), True),
            StructField("airport_city", StringType(), True),
            StructField("airport_code", StringType(), True),
            StructField("airline", StringType(), True),
            StructField("aircraft_type", StringType(), True),
            StructField("aircraft_registration", StringType(), True),
            StructField("actual_time", IntegerType(), True)
    ])

    # Reading the arrivals JSON files from dbfs FileStore
    df1 = spark.read.option("multiline","true") \
        .schema(schema=schema) \
        .json(f"/FileStore/flight_data_{var2_arrivals}_{date.today() - timedelta(days=1)}.json")
    
    df1 = df1.withColumn("flight_type", F.lit('Arrivals'))

    # Reading the departures JSON files from dbfs FileStore
    df2 = spark.read.option("multiline","true") \
        .schema(schema=schema) \
        .json(f"/FileStore/flight_data_{var2_departures}_{date.today() - timedelta(days=1)}.json")
    
    df2 = df2.withColumn("flight_type", F.lit('Departures'))

    # Get the union of both df1 (arrivals) and df2 (departures)
    df = df1.union(df2)

    # Access mounted ADLS
    mount_adls_using_sp(storage_account,storage_ingestion_container)

    # Write dataframe to ADLS (as Delta Lake) in 'ADLS Ingestion Layer'
    # This will create partitioned folders by date
    df.write.format("delta").save(f"/mnt/{storage_account_name}/{storage_container_name}/{date.today() - timedelta(days=1)}")

In [None]:
# Driver code

####################################### (JSON) FILES CREATION #######################################

# Create daily JSON file for arrivals in dbfs FileStore folder
file_creation(var_1_arrivals,var_2_arrivals,var_3_arrivals)
# Create daily JSON file for departures in dbfs FileStore folder
file_creation(var_1_departures, var_2_departures, var_3_departures)

####################################### INGESTION #######################################

# Ingest data for arrivals & departures in 'ADLS Ingestion Layer' by date
ingestion(var_2_arrivals,var_2_departures,storage_account,storage_ingestion_container)

In [None]:
# TEST

display(spark.read.format("delta").load(f"/mnt/{storage_account}/{storage_ingestion_container}/{date.today() - timedelta(days=1)}"))