**Checking for mount points in Databricks**

In [0]:
dbutils.fs.mounts()

[MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/mnt/adls', source='wasbs://filestore@hbgdevstorage.blob.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/Volume', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/volumes', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType=''),
 MountInfo(mountPoint='/volume', source='DbfsReserved', encryptionType='')]

**Mounting an External Azure storage to keep the Flight data** 

This will fail if we run the second time as the mount will be created in the first run.

In [0]:
dbutils.fs.mount(
    source="wasbs://filestore@hbgdevstorage.blob.core.windows.net/",
    mount_point="/mnt/adls",
    extra_configs={"fs.azure.account.key.hbgdevstorage.blob.core.windows.net":"27z744YtQ/nekQxE4FyjgLSuIxkFVBY/OIaFTr0CvfecJResP4FCXcF+4GRX+/O3KRMCF56j4dqm+ASt6sCfjg=="}
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-5992162601892664>, line 1[0m
[0;32m----> 1[0m dbutils[38;5;241m.[39mfs[38;5;241m.[39mmount(
[1;32m      2[0m     source[38;5;241m=[39m[38;5;124m"[39m[38;5;124mwasbs://filestore@hbgdevstorage.blob.core.windows.net/[39m[38;5;124m"[39m,
[1;32m      3[0m     mount_point[38;5;241m=[39m[38;5;124m"[39m[38;5;124m/mnt/adls[39m[38;5;124m"[39m,
[1;32m      4[0m     extra_configs[38;5;241m=[39m{[38;5;124m"[39m[38;5;124mfs.azure.account.key.hbgdevstorage.blob.core.windows.net[39m[38;5;124m"[39m:[38;5;124m"[39m[38;5;124m27z744YtQ/nekQxE4FyjgLSuIxkFVBY/OIaFTr0CvfecJResP4FCXcF+4GRX+/O3KRMCF56j4dqm+ASt6sCfjg==[39m[38;5;124m"[39m}
[1;32m      5[0m )

File [0;32m/databricks/python_shell/lib/dbruntime/dbutils.py:172[0m, in [0;36mprettify_exception_message.<locals>.f_

Once the mount point is created i run the below script in my local terminal to copy the file from my local to the mount point.

databricks fs cp ~/Downloads/Flight_Data.csv dbfs:/mnt/adls/Flight_Data.csv

We can also do this by manually uploading the file to the storage path. (Upload directly in Azure in this case)

**Listing the files in the mount point /mnt/adls/**

In [0]:
dbutils.fs.ls("/mnt/adls/")

[FileInfo(path='dbfs:/mnt/adls/Balaji_Fast_Food_Sales.csv', name='Balaji_Fast_Food_Sales.csv', size=62494, modificationTime=1743890368000),
 FileInfo(path='dbfs:/mnt/adls/Details.csv', name='Details.csv', size=63384, modificationTime=1740179475000),
 FileInfo(path='dbfs:/mnt/adls/Details1.csv', name='Details1.csv', size=479, modificationTime=1741729888000),
 FileInfo(path='dbfs:/mnt/adls/Flight_Data.csv', name='Flight_Data.csv', size=4929, modificationTime=1748824049000),
 FileInfo(path='dbfs:/mnt/adls/Orders.csv', name='Orders.csv', size=23224, modificationTime=1740179475000),
 FileInfo(path='dbfs:/mnt/adls/SalesDW/', name='SalesDW/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/adls/ingredients_csv/', name='ingredients_csv/', size=0, modificationTime=1743892580000),
 FileInfo(path='dbfs:/mnt/adls/products.csv', name='products.csv', size=14372, modificationTime=1740349112000)]

**1. Load the attached CSV file located in C:/user$/documents/ into a PySpark DataFrame**

I am using read.format('csv') with header as true and inferSchema to get the schema from the file. I read the file from the mount point created to a dataframe df_flight_data.

In [0]:
df_flight_data=spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/adls/Flight_Data.csv")
df_flight_data.display()

flight_id,airline_code,flight_number,origin_airport,destination_airport,aircraft_type,departure_time,arrival_time,passenger_count,flight_distance,ticket_price,flight_status
FL001,NZ,101,AKL,CHC,Airbus A320,2024-01-15T08:00:00Z,2024-01-15T09:20:00Z,148,539,159.99,On Time
FL002,JQ,201,WLG,AKL,Airbus A320,2024-01-15T09:15:00Z,2024-01-15T10:30:00Z,156,484,89.99,Delayed
FL003,NZ,301,CHC,AKL,Boeing 737,2024-01-15T10:30:00Z,2024-01-15T11:50:00Z,142,539,149.99,On Time
FL004,VA,401,DUD,WLG,ATR 72,2024-01-15T12:00:00Z,2024-01-15T13:45:00Z,68,447,119.99,On Time
FL005,NZ,501,AKL,WLG,Airbus A321,2024-01-15T14:30:00Z,2024-01-15T15:45:00Z,163,484,109.99,Delayed
FL006,JQ,601,ROT,AKL,Airbus A320,2024-01-15T16:00:00Z,2024-01-15T17:10:00Z,134,230,79.99,On Time
FL007,NZ,701,WLG,CHC,Boeing 737,2024-01-15T18:45:00Z,2024-01-15T20:10:00Z,156,300,129.99,On Time
FL008,VA,801,HLZ,WLG,ATR 72,2024-01-15T20:00:00Z,2024-01-15T21:30:00Z,58,428,99.99,On Time
FL009,NZ,151,AKL,DUD,Airbus A320,2024-01-16T07:30:00Z,2024-01-16T09:15:00Z,159,986,199.99,Delayed
FL010,JQ,251,WLG,CHC,Airbus A320,2024-01-16T09:00:00Z,2024-01-16T10:25:00Z,168,300,89.99,On Time


**2. Display the schema of the DataFrame**

To display the schema I am using printSchema() function.

In [0]:
df_flight_data.printSchema()

root
 |-- flight_id: string (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- flight_number: integer (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- aircraft_type: string (nullable = true)
 |-- departure_time: timestamp (nullable = true)
 |-- arrival_time: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- flight_distance: integer (nullable = true)
 |-- ticket_price: double (nullable = true)
 |-- flight_status: string (nullable = true)



**3. Show the first 10 rows**
Using limit(10) and display() function to show first 10 records. We can also use df.show(10) or df.head(10) for this purpose.


In [0]:
df_flight_data.limit(10).display()

flight_id,airline_code,flight_number,origin_airport,destination_airport,aircraft_type,departure_time,arrival_time,passenger_count,flight_distance,ticket_price,flight_status
FL001,NZ,101,AKL,CHC,Airbus A320,2024-01-15T08:00:00Z,2024-01-15T09:20:00Z,148,539,159.99,On Time
FL002,JQ,201,WLG,AKL,Airbus A320,2024-01-15T09:15:00Z,2024-01-15T10:30:00Z,156,484,89.99,Delayed
FL003,NZ,301,CHC,AKL,Boeing 737,2024-01-15T10:30:00Z,2024-01-15T11:50:00Z,142,539,149.99,On Time
FL004,VA,401,DUD,WLG,ATR 72,2024-01-15T12:00:00Z,2024-01-15T13:45:00Z,68,447,119.99,On Time
FL005,NZ,501,AKL,WLG,Airbus A321,2024-01-15T14:30:00Z,2024-01-15T15:45:00Z,163,484,109.99,Delayed
FL006,JQ,601,ROT,AKL,Airbus A320,2024-01-15T16:00:00Z,2024-01-15T17:10:00Z,134,230,79.99,On Time
FL007,NZ,701,WLG,CHC,Boeing 737,2024-01-15T18:45:00Z,2024-01-15T20:10:00Z,156,300,129.99,On Time
FL008,VA,801,HLZ,WLG,ATR 72,2024-01-15T20:00:00Z,2024-01-15T21:30:00Z,58,428,99.99,On Time
FL009,NZ,151,AKL,DUD,Airbus A320,2024-01-16T07:30:00Z,2024-01-16T09:15:00Z,159,986,199.99,Delayed
FL010,JQ,251,WLG,CHC,Airbus A320,2024-01-16T09:00:00Z,2024-01-16T10:25:00Z,168,300,89.99,On Time


**4. Count the total number of records**

I am using count() to count the number of records in the dataframe and assigning it to a variable.

In [0]:
df_count=df_flight_data.count()
display(df_count)

50

**5. Filter out any records where passenger_count is less than or equal to 0**

Using filter(condition) I am filtering records with passenger_count less than or equal to 0. There are no such records, so the output doesnt show any values.

In [0]:
df_filter=df_flight_data.filter(df_flight_data['passenger_count'] <= 0)
df_filter.display()

flight_id,airline_code,flight_number,origin_airport,destination_airport,aircraft_type,departure_time,arrival_time,passenger_count,flight_distance,ticket_price,flight_status


**6. Add a new column flight_duration_hours that calculates the flight duration in hours.**

To calculate the flight duration I am subracting the arrival_time and departure_time which will give the timestamp in seconds. So I convert this to hours by diving this by 60*60 (ie 3600) and round the values to 2 decimal places. Created a new dataframe df_with_duration with the new column duration_hours.

In [0]:
from pyspark.sql.functions import col, unix_timestamp, round
# Convert string timestamps to timestamp type
df_with_duration = df_flight_data.withColumn("duration_hours",
                                 round((unix_timestamp("arrival_time") - unix_timestamp("departure_time")) / 3600.0, 2))


Display df_with_duration

In [0]:
df_with_duration.display()

flight_id,airline_code,flight_number,origin_airport,destination_airport,aircraft_type,departure_time,arrival_time,passenger_count,flight_distance,ticket_price,flight_status,duration_hours
FL001,NZ,101,AKL,CHC,Airbus A320,2024-01-15T08:00:00Z,2024-01-15T09:20:00Z,148,539,159.99,On Time,1.33
FL002,JQ,201,WLG,AKL,Airbus A320,2024-01-15T09:15:00Z,2024-01-15T10:30:00Z,156,484,89.99,Delayed,1.25
FL003,NZ,301,CHC,AKL,Boeing 737,2024-01-15T10:30:00Z,2024-01-15T11:50:00Z,142,539,149.99,On Time,1.33
FL004,VA,401,DUD,WLG,ATR 72,2024-01-15T12:00:00Z,2024-01-15T13:45:00Z,68,447,119.99,On Time,1.75
FL005,NZ,501,AKL,WLG,Airbus A321,2024-01-15T14:30:00Z,2024-01-15T15:45:00Z,163,484,109.99,Delayed,1.25
FL006,JQ,601,ROT,AKL,Airbus A320,2024-01-15T16:00:00Z,2024-01-15T17:10:00Z,134,230,79.99,On Time,1.17
FL007,NZ,701,WLG,CHC,Boeing 737,2024-01-15T18:45:00Z,2024-01-15T20:10:00Z,156,300,129.99,On Time,1.42
FL008,VA,801,HLZ,WLG,ATR 72,2024-01-15T20:00:00Z,2024-01-15T21:30:00Z,58,428,99.99,On Time,1.5
FL009,NZ,151,AKL,DUD,Airbus A320,2024-01-16T07:30:00Z,2024-01-16T09:15:00Z,159,986,199.99,Delayed,1.75
FL010,JQ,251,WLG,CHC,Airbus A320,2024-01-16T09:00:00Z,2024-01-16T10:25:00Z,168,300,89.99,On Time,1.42


**Adding Assertions on the dataframe**

  1. Checking if the origin_airport and destination_airport is the same. This should not be same. My assumption is we are looking for only valid flights.
  2. Checking if the departure_time is greater than arrival_time. arrival time should be always greater than departure_time
  3. Checking if any of the column value is null. All the column looks like they are not nullable, so checking if there are any null values.

In [0]:
# Rows where source == destination
same_location_count = df_with_duration.filter(col("origin_airport") == col("destination_airport")).count()

# Assert none of them are the same
assert same_location_count == 0, f"Found {same_location_count} rows where source and destination are the same"

#Rows where arrival <= departure
invalid_time_count = df_with_duration.filter(col("arrival_time") <= col("departure_time")).count()

#Assert arrival time <= departure time
assert invalid_time_count == 0, f"FAIL: {invalid_time_count} rows with arrival <= departure"

#Assert check if any of the column has null values. All the columns should have no null values
for column in df_with_duration.columns:
    null_count = df_with_duration.filter(col(column).isNull()).count()
    assert null_count == 0, f"Column '{column}' has {null_count} null values"
    print(f"PASS: Column '{column}' has no nulls")


PASS: Column 'flight_id' has no nulls
PASS: Column 'airline_code' has no nulls
PASS: Column 'flight_number' has no nulls
PASS: Column 'origin_airport' has no nulls
PASS: Column 'destination_airport' has no nulls
PASS: Column 'aircraft_type' has no nulls
PASS: Column 'departure_time' has no nulls
PASS: Column 'arrival_time' has no nulls
PASS: Column 'passenger_count' has no nulls
PASS: Column 'flight_distance' has no nulls
PASS: Column 'ticket_price' has no nulls
PASS: Column 'flight_status' has no nulls
PASS: Column 'duration_hours' has no nulls
