## Import Libraries and create spark session

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp,round

#Create Spark Session
spark=SparkSession.builder.appName('Flight Data Analysis').getOrCreate()


### Load flight data in Datframe

In [0]:
# file_path="C:\\suraj\\Documents\\Flight_Data.csv"
# flight_df=spark.read.csv(file_path,inferSchema=True,header=True)
file_path = "dbfs:/FileStore/tables/Flight_Data.csv"
flight_df = spark.read.csv(file_path, inferSchema=True, header=True)

#### print schema of the dataframe

In [0]:
flight_df.printSchema()

root
 |-- flight_id: string (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- flight_number: integer (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- aircraft_type: string (nullable = true)
 |-- departure_time: timestamp (nullable = true)
 |-- arrival_time: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- flight_distance: integer (nullable = true)
 |-- ticket_price: double (nullable = true)
 |-- flight_status: string (nullable = true)



#### Enforce schema

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, DoubleType, TimestampType

flight_df = flight_df.select(
    col("flight_id").cast(StringType()),
    col("airline_code").cast(StringType()),
    col("flight_number").cast(IntegerType()),
    col("origin_airport").cast(StringType()),
    col("destination_airport").cast(StringType()),
    col("aircraft_type").cast(StringType()),
    col("departure_time").cast(TimestampType()),
    col("arrival_time").cast(TimestampType()),
    col("passenger_count").cast(IntegerType()),
    col("flight_distance").cast(IntegerType()),
    col("ticket_price").cast(DoubleType()),
    col("flight_status").cast(StringType())
)



#### display few records

In [0]:
flight_df.show(10,truncate=False)

+---------+------------+-------------+--------------+-------------------+-------------+-------------------+-------------------+---------------+---------------+------------+-------------+
|flight_id|airline_code|flight_number|origin_airport|destination_airport|aircraft_type|departure_time     |arrival_time       |passenger_count|flight_distance|ticket_price|flight_status|
+---------+------------+-------------+--------------+-------------------+-------------+-------------------+-------------------+---------------+---------------+------------+-------------+
|FL001    |NZ          |101          |AKL           |CHC                |Airbus A320  |2024-01-15 08:00:00|2024-01-15 09:20:00|148            |539            |159.99      |On Time      |
|FL002    |JQ          |201          |WLG           |AKL                |Airbus A320  |2024-01-15 09:15:00|2024-01-15 10:30:00|156            |484            |89.99       |Delayed      |
|FL003    |NZ          |301          |CHC           |AKL         

#### Total number of records

In [0]:
flight_df.count()

Out[19]: 50

### Check null values in each column

In [0]:
from pyspark.sql.functions import col, sum
flight_df.select([ sum(col(c).isNull().cast('int')).alias(c) for c in flight_df.columns]).show()

+---------+------------+-------------+--------------+-------------------+-------------+--------------+------------+---------------+---------------+------------+-------------+
|flight_id|airline_code|flight_number|origin_airport|destination_airport|aircraft_type|departure_time|arrival_time|passenger_count|flight_distance|ticket_price|flight_status|
+---------+------------+-------------+--------------+-------------------+-------------+--------------+------------+---------------+---------------+------------+-------------+
|        0|           0|            0|             0|                  0|            0|             0|           0|              0|              0|           0|            0|
+---------+------------+-------------+--------------+-------------------+-------------+--------------+------------+---------------+---------------+------------+-------------+



### Flagging Bad recordss (Instead of dropping)

In [0]:
from pyspark.sql.functions import when

flight_df = flight_df.withColumn(
    "is_valid",
    when(
        (col("passenger_count") > 0) & (col("arrival_time") > col("departure_time")),
        True
    ).otherwise(False)
)




In [0]:
flight_df_cleaned=flight_df.filter(col('is_valid')==True)
print(flight_df_cleaned.count())
flight_df.count()

50
Out[25]: 50

#### Derive a new column flight_duration_hours

In [0]:
df_with_duration = flight_df_cleaned.withColumn(
    "flight_duration_hours",
    round(
        (unix_timestamp("arrival_time") - unix_timestamp("departure_time")) / 3600, 2
    )
)

In [0]:
df_with_duration.select("departure_time", "arrival_time", "flight_duration_hours").show(5)

+-------------------+-------------------+---------------------+
|     departure_time|       arrival_time|flight_duration_hours|
+-------------------+-------------------+---------------------+
|2024-01-15 08:00:00|2024-01-15 09:20:00|                 1.33|
|2024-01-15 09:15:00|2024-01-15 10:30:00|                 1.25|
|2024-01-15 10:30:00|2024-01-15 11:50:00|                 1.33|
|2024-01-15 12:00:00|2024-01-15 13:45:00|                 1.75|
|2024-01-15 14:30:00|2024-01-15 15:45:00|                 1.25|
+-------------------+-------------------+---------------------+
only showing top 5 rows

