## Transformations
###### 1. All spaces and underscores from column names should be removed. Eg. "roll_no" or "roll no" should become "RollNo".
###### 2. "Departure Date" column: Timestamp to Date.
###### 3. "Gender" column: Single letter Denotion. Eg. 'Male' or 'male' to 'M'.
###### 4. Create new column "Country_Continent_Code"  as a combination of Airport_Country_Code and Airport_Continent.
###### 5. Filter: only keep records which have 1 row for each Passenger_Key

In [0]:
dbutils.fs.ls("mnt/airline-data-bronze/dbo/")

[FileInfo(path='dbfs:/mnt/airline-data-bronze/dbo/Airline Data/', name='Airline Data/', size=0, modificationTime=1699355276000)]

In [0]:
dbutils.fs.ls("mnt/airline-data-silver/")

[FileInfo(path='dbfs:/mnt/airline-data-silver/dbo/', name='dbo/', size=0, modificationTime=1699681142000)]

In [0]:
dbutils.fs.ls("/mnt/airline-data-bronze/dbo/Airline Data")

[FileInfo(path='dbfs:/mnt/airline-data-bronze/dbo/Airline Data/Airline Data.csv', name='Airline Data.csv', size=16876458, modificationTime=1699355453000)]

In [0]:
input_path = "/mnt/airline-data-bronze/dbo/Airline Data/Airline Data.csv"

airline_df = spark.read.format("csv") \
                        .option("header", True) \
                        .option("inferSchema", True) \
                        .load(input_path)

# display(airline_df)

In [0]:
display(airline_df.count())

98619

In [0]:
airline_df_1 = airline_df \
    .withColumnRenamed("Passenger_ID","PassengerID") \
    .withColumnRenamed("First_Name","FirstName") \
    .withColumnRenamed("Last_Name","LastName") \
    .withColumnRenamed("Gender","Gender") \
    .withColumnRenamed("Age","Age") \
    .withColumnRenamed("Nationality","Nationality") \
    .withColumnRenamed("Airport_Name","AirportName") \
    .withColumnRenamed("Airport_Country_Code","AirportCountryCode") \
    .withColumnRenamed("Country_Name","CountryName") \
    .withColumnRenamed("Airport_Continent","AirportContinent") \
    .withColumnRenamed("Continents","Continents") \
    .withColumnRenamed("Departure_Date","DepartureDate") \
    .withColumnRenamed("Arrival_Airport","ArrivalAirport") \
    .withColumnRenamed("Pilot_Name","PilotName") \
    .withColumnRenamed("Flight_Status","FlightStatus")

# display(airline_df_1)

In [0]:
from pyspark.sql.functions import to_date, col

airline_df_2 = airline_df_1 \
                    .withColumn("DepartureDate", to_date(col("DepartureDate"), 'dd-MM-yyyy'))

# display(airline_df_2)

In [0]:
airline_df_2.createOrReplaceTempView("airline_trans_2")

In [0]:
%sql
drop table if exists airline_trans_3;

create table airline_trans_3 as
select PassengerID, FirstName, LastName, substring(Gender,0,1) as Gender, Age, Nationality, AirportName, AirportCountryCode, CountryName, AirportContinent, Continents, DepartureDate, ArrivalAirport, PilotName, FlightStatus
from airline_trans_2;

num_affected_rows,num_inserted_rows


In [0]:
%sql
drop table if exists airline_trans_4;

create table airline_trans_4 as
select AirportCountryCode||'-'||AirportContinent as Country_Continent_Code, *
from airline_trans_3;


num_affected_rows,num_inserted_rows


In [0]:
%sql
drop table if exists airline_trans_5;

create table airline_trans_5 as
select * 
from (
  select count(*) over(partition by PassengerID) as cnt, *
  from airline_trans_4
)
where cnt = 1
order by PassengerID;

num_affected_rows,num_inserted_rows


In [0]:
%sql

merge into silver_dbo_db.airline_data_silver as t
using airline_trans_5 as s
    on t.PassengerID = s.PassengerID
when matched /*and t.Start_Date < s.Start_Date*/ then
    update set
      t.FirstName = s.FirstName,
      t.LastName = s.LastName,
      t.Gender = s.Gender,
      t.Age = s.Age,
      t.Nationality = s.Nationality,
      t.AirportName = s.AirportName,
      t.Country_Continent_Code = s.Country_Continent_Code, 
      t.AirportCountryCode = s.AirportCountryCode,
      t.CountryName = s.CountryName,
      t.AirportContinent = s.AirportContinent,
      t.Continents = s.Continents,
      t.DepartureDate = s.DepartureDate,
      t.ArrivalAirport = s.ArrivalAirport,
      t.PilotName = s.PilotName,
      t.FlightStatus = s.FlightStatus
when not matched by target then
    insert (PassengerID, FirstName, LastName, Gender, Age, Nationality, AirportName, Country_Continent_Code, AirportCountryCode, CountryName, AirportContinent, Continents, DepartureDate, ArrivalAirport, PilotName, FlightStatus)
    values (s.PassengerID , s.FirstName, s.LastName, s.Gender, s.Age, s.Nationality, s.AirportName, s.Country_Continent_Code, s.AirportCountryCode, s.CountryName, s.AirportContinent, s.Continents, s.DepartureDate, s.ArrivalAirport, s.PilotName, s.FlightStatus)


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
32876,0,0,32876
