#### 1. Specify either the S3 or local path that contains the raw data files in CSV and JSON formats

In [0]:
# Specify whether the raw files are stored in S3 or in /raw_data directory

# Set to True if you wish to use s3 as the data source. 
# Set to False if raw data is hosted locally here in Databricks in /raw_data folder.
use_s3 = True

file_path = "s3://raw-air-boltic/" if use_s3 else "/raw_data/"

#### 2.a. Load Air Boltic data from CSVs into dataframes

In [0]:
if use_s3:
    # Read raw data from S3 directly using Spark
    df_customer = spark.read.option("header", "true").csv(file_path + "customer.csv")
    df_customer_group = spark.read.option("header", "true").csv(file_path + "customer_group.csv")
    df_aeroplane = spark.read.option("header", "true").csv(file_path + "aeroplane.csv")
    df_order = spark.read.option("header", "true").csv(file_path + "order.csv")
    df_trip = spark.read.option("header", "true").csv(file_path + "trip.csv")
else:
    # Read raw data from local using pandas, then convert to Spark
    import pandas as pd

    pdf_customer = pd.read_csv(file_path + "customer.csv")
    pdf_customer_group = pd.read_csv(file_path + "customer_group.csv")
    pdf_aeroplane = pd.read_csv(file_path + "aeroplane.csv")
    pdf_order = pd.read_csv(file_path + "order.csv")
    pdf_trip = pd.read_csv(file_path + "trip.csv")

    df_customer = spark.createDataFrame(pdf_customer)
    df_customer_group = spark.createDataFrame(pdf_customer_group)
    df_aeroplane = spark.createDataFrame(pdf_aeroplane)
    df_order = spark.createDataFrame(pdf_order)
    df_trip = spark.createDataFrame(pdf_trip)

#### 2.b. Load Aeroplane Model data from JSON into dataframes and flatten it

In [0]:
import json
import pandas as pd

if use_s3:
    json_string_multiline = spark.read.text(file_path + "aeroplane_model.json").collect()
    json_string = json_string = "\n".join(row.value for row in json_string_multiline)
    data = json.loads(json_string)
else:
    with open(file_path + "aeroplane_model.json", "r") as f:
        data = json.load(f)

rows = []
for manufacturer, models in data.items():
    for model, specs in models.items():
        rows.append({
            "manufacturer": manufacturer,
            "model": model,
            **specs
        })

df_aeroplane_model = pd.DataFrame(rows)


#### 3. Explore all the raw data

In [0]:
df_customer.display()
df_customer_group.display()
df_trip.display()
df_order.display()
df_aeroplane.display()
df_aeroplane_model.display()

Customer ID,Name,Customer Group ID,Email,Phone Number
1,John Doe,1.0,john.doe@gmail.com,+1-555-123-4567
2,Jane Smith,,jane.smith@yahoo.com,+44 20 7946 0958
3,Alice Johnson,3.0,alice.j@example.com,+49-30-12345678
4,Bob Brown,4.0,bob.brown@outlook.com,+61-2-9876-5432
5,Carol White,,carol.white@company.com,+1 (555) 567-8901
6,David Wilson,6.0,david.wilson@domain.co.uk,+33 1 44 55 66 77
7,Emma Davis,7.0,emma.davis@abc.net,
8,Frank Miller,,frank.m@anothermail.com,+91-22-1234-5678
9,Grace Taylor,9.0,grace.taylor@mailservice.org,+81-3-1234-5678
10,Henry Anderson,10.0,henry.anderson@website.io,+1-555-012-3456


ID,Type,Name,Registry number
1,Company,Bolt,5421524153
2,Company,BigBang,131101002
3,Company,Lofty,1533153
4,Private Group,Mari's friends,
5,Organisation,Non-profit for dog safety,AT452453


Trip ID,Origin City,Destination City,Airplane ID,Start Timestamp,End Timestamp
1,New York,London,1,2024-08-01 14:30:00,2024-08-02 02:00:00
2,Tokyo,Paris,2,2024-08-03 09:00:00,2024-08-03 17:00:00
3,Sydney,Los Angeles,3,2024-08-05 22:00:00,2024-08-05 15:00:00
4,Dubai,Singapore,4,2024-08-07 06:00:00,2024-08-07 10:30:00
5,Berlin,Madrid,5,2024-08-09 16:00:00,2024-08-09 19:00:00
6,Moscow,Hong Kong,6,2024-08-10 11:00:00,2024-08-10 21:00:00
7,Sao Paulo,Miami,7,2024-08-12 13:00:00,2024-08-12 19:30:00
8,Toronto,Chicago,8,2024-08-13 08:30:00,2024-08-13 10:00:00
9,Beijing,San Francisco,9,2024-08-14 19:00:00,2024-08-14 13:00:00
10,Mumbai,Dubai,10,2024-08-15 04:00:00,2024-08-15 06:30:00


Order ID,Customer ID,Trip ID,Price (EUR),Seat No,Status
1,3,10,1200,12A,Finished
2,5,7,1800,8B,Finished
3,8,2,1500,15C,Finished
4,12,8,1300,21D,Finished
5,16,8,700,3E,Finished
6,9,6,2000,9F,Finished
7,2,6,1600,14A,Booked
8,14,2,500,2B,Finished
9,11,6,2100,17C,Booked
10,6,4,900,4D,Finished


Airplane ID,Airplane Model,Manufacturer
1,737-800,Boeing
2,A320neo,Airbus
3,E190-E2,Embraer
4,CRJ900,Bombardier
5,737-800,Boeing
6,G650,Gulfstream
7,737-800,Boeing
8,CRJ900,Bombardier
9,CRJ900,Bombardier
10,787-9,Boeing


manufacturer,model,max_seats,max_weight,max_distance,engine_type
Boeing,737-800,189,79015,2935,CFM56-7B
Boeing,777-300ER,396,351535,7930,GE90-115B
Boeing,787-9,296,254000,7635,GEnx-1B
Airbus,A320neo,194,79000,3700,CFM LEAP-1A
Airbus,A350-900,440,280000,8100,Rolls-Royce Trent XWB
Airbus,A380-800,853,560000,8000,Rolls-Royce Trent 900
Embraer,E190-E2,114,56600,3250,PW1900G
Embraer,E175,88,37500,2200,CF34-8E
Embraer,E195-E2,146,61700,2600,PW1900G
Bombardier,CRJ900,90,38530,1554,CF34-8C5


#### 4. Do some data clean-up

##### 4.a. Replace empty strings in column names with underscores to allow the creation of SQL tables here in Databricks. 

###### Do not edit the column names in any other way. Further clean-up will be done in dbt. 

In [0]:
def clean_column_names(df):
    return df.toDF(*[col.replace(" ", "_").replace("(", "").replace(")", "") for col in df.columns])

df_customer = clean_column_names(df_customer)
df_customer_group = clean_column_names(df_customer_group)
df_trip = clean_column_names(df_trip)
df_order = clean_column_names(df_order)
df_aeroplane = clean_column_names(df_aeroplane)

##### 4.b. Fix the customer group ID assignment in customer dataframe and add new 'Unknown group'to customer group dataframe.

###### This will allow us to treat the customer group ID in customer dataframe as a foreign key later. 

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import when, lit

# Add new 'Unknown group' type to df_customer_group so we don't lose the customer group assignment in df_customer table. The assignment of customer group IDs 6-10 may be relevant so we don't want to lose that information
new_rows = [
        Row(ID='999', Type='Unknown group', Name=None, Registry_number=None),
        Row(ID='6', Type='Unknown group', Name=None, Registry_number=None),
        Row(ID='7', Type='Unknown group', Name=None, Registry_number=None),
        Row(ID='8', Type='Unknown group', Name=None, Registry_number=None),
        Row(ID='9', Type='Unknown group', Name=None, Registry_number=None),
        Row(ID='10', Type='Unknown group', Name=None, Registry_number=None)
    ]
new_df = spark.createDataFrame(new_rows, schema=df_customer_group.schema)

df_customer_group = df_customer_group.unionAll(new_df)

# Set null values in the df_customer.Customer_Group_ID to 999
df_customer = df_customer.withColumn(
    "Customer_Group_ID",
    when(df_customer["Customer_Group_ID"].isNull(), lit("999"))
    .otherwise(df_customer["Customer_Group_ID"])
)

##### 5. Create a SQL schema for storing raw air botlic data

In [0]:
%sql

CREATE SCHEMA IF NOT EXISTS air_boltic_raw;

##### 6. Create SQL tables from data frames

In [0]:
df_customer.write.mode("overwrite").saveAsTable("air_boltic_raw.customers")
df_customer_group.write.mode("overwrite").saveAsTable("air_boltic_raw.customer_groups")
df_order.write.mode("overwrite").saveAsTable("air_boltic_raw.orders")
df_trip.write.mode("overwrite").saveAsTable("air_boltic_raw.trips")
df_aeroplane.write.mode("overwrite").saveAsTable("air_boltic_raw.aeroplanes")

spark_df_aeroplane_model = spark.createDataFrame(df_aeroplane_model)
spark_df_aeroplane_model.write.mode("overwrite").saveAsTable("air_boltic_raw.aeroplane_models")

##### _Unused code block_

While I didn't end up using this step, I wanted to keep it here to explain my thinking around the start and end timestamps in the <a href="https://docs.google.com/spreadsheets/d/1x9F8gAosLABNyGdjaDsAnUQ50bIHp9FKOX1j-7ERBsw/edit?gid=1153973902#gid=1153973902">trips table</a>. 

I wasn't sure whether the timestamps in this table are in UTC or not. Usually on the boarding passes and in the booking information the start time is in the origin city timezone and the end time is in the destination city timezone. So at first, I assumed the same thing here. 

Therefore, I created this cities reference table that can later (in dbt) be used to convert the start and end times to UTC based on the given timezone per city. However, after doing that and then calculating the trip durations, I noticed that some of the durations did not make sense at all considering the distance between the cities. 

So I ended up not using the timezones, and just assuming that the trip start and end times are in UTC. The durations still don't make sense in all the cases, but considering that this is all test data for this exercise, I hope that that is okay. 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Create a city_timezones reference table that can later be used in dbt to convert trip start and end timestamps into UTC

# View a list of unique cities to then find their respective timezones
origin_cities = [row['Origin_City'] for row in df_trip.select("Origin_City").distinct().collect()]
dest_cities = [row['Destination_City'] for row in df_trip.select("Destination_City").distinct().collect()]
all_cities = set(origin_cities + dest_cities)
# display(all_cities) # Uncomments as needed to view the list of unique cities

# This map is hardcoded for now based on all the unique cities in the all_cities set. But in production, here we should use some kind of a helper library/service that can determine the timezone for you based on the city you provide. 
city_timezone_data = [
    ("Amsterdam", "Europe/Amsterdam"),
    ("Auckland", "Pacific/Auckland"),
    ("Bangkok", "Asia/Bangkok"),
    ("Beijing", "Asia/Shanghai"),
    ("Berlin", "Europe/Berlin"),
    ("Cape Town", "Africa/Johannesburg"),
    ("Chicago", "America/Chicago"),
    ("Dubai", "Asia/Dubai"),
    ("Frankfurt", "Europe/Berlin"),
    ("Hong Kong", "Asia/Hong_Kong"),
    ("Johannesburg", "Africa/Johannesburg"),
    ("London", "Europe/London"),
    ("Los Angeles", "America/Los_Angeles"),
    ("Madrid", "Europe/Madrid"),
    ("Melbourne", "Australia/Melbourne"),
    ("Mexico City", "America/Mexico_City"),
    ("Miami", "America/New_York"),
    ("Moscow", "Europe/Moscow"),
    ("Mumbai", "Asia/Kolkata"),
    ("New York", "America/New_York"),
    ("Paris", "Europe/Paris"),
    ("Rio de Janeiro", "America/Sao_Paulo"),
    ("San Francisco", "America/Los_Angeles"),
    ("Sao Paulo", "America/Sao_Paulo"),
    ("Singapore", "Asia/Singapore"),
    ("Sydney", "Australia/Sydney"),
    ("Tokyo", "Asia/Tokyo"),
    ("Toronto", "America/Toronto"),
    ("Vancouver", "America/Vancouver")
]

schema = StructType([
    StructField("city", StringType(), False),
    StructField("timezone", StringType(), False)
])

df_cities = spark.createDataFrame(city_timezone_data, schema=schema)
# Do not create the table for now
# df_cities.write.mode("overwrite").saveAsTable("air_boltic_raw.cities")

df_cities.display()


city,timezone
Amsterdam,Europe/Amsterdam
Auckland,Pacific/Auckland
Bangkok,Asia/Bangkok
Beijing,Asia/Shanghai
Berlin,Europe/Berlin
Cape Town,Africa/Johannesburg
Chicago,America/Chicago
Dubai,Asia/Dubai
Frankfurt,Europe/Berlin
Hong Kong,Asia/Hong_Kong
