In [0]:
%sql
select * from databricks_airline_performance_data.v01.flights_small

In [0]:
flights_small_df = spark.read.table("databricks_airline_performance_data.v01.flights_small")

In [0]:
flights_small_df.printSchema()

In [0]:
display(flights_small_df.limit(10))

In [0]:
# select only required columns from dataframe 
flights_small_required_cols_df = flights_small_df.select("Year","Month","DayofMonth","DepTime","FlightNum","ActualElapsedTime","CRSElapsedTime","ArrDelay")

#get a initial Count 
initial_count = flights_small_required_cols_df.count()
print(f"selected record count , {initial_count}")

In [0]:
# examine the data for invalid values for string columns ArrDelay, ActualElapsedTime, DepTime
# create a temporary view by casting three fields to integer

flights_small_required_cols_df.selectExpr("Year",
                                      "Month",
                                      "DayofMonth",
                                      "CAST(DepTime as INT) as DepTime",
                                      "FlightNum",
                                      "CAST(ActualElapsedTime AS INT) AS ActualElapsedTime",
                                      "CRSElapsedTime",
                                      "CAST(ArrDelay AS INT) AS ArrDelay"
                                      ).createOrReplaceTempView("flights_temp")


In [0]:
%sql
SHOW VIEWS IN global_temp;

In [0]:
# use Spark SQL to count Nulls 
flights_small_required_cols_df.selectExpr("Year",
                                      "Month",
                                      "DayofMonth",
                                      "try_cast(DepTime as INT) as DepTime",
                                      "FlightNum",
                                      "try_cast(ActualElapsedTime AS INT) AS ActualElapsedTime",
                                      "CRSElapsedTime",
                                      "try_cast(ArrDelay AS INT) AS ArrDelay"
                                      ).createOrReplaceTempView("flights_temp")
invalid_count_sql = spark.sql("""
                              select COUNT_IF(Year is null) as null_year_count,
                                count_if(month is null) as null_month_count,
                                count_if(dayofmonth is null) as null_dayofmonth_count,
                                count_if(deptime is null) as null_deptime_count,
                                count_if(FlightNum is null) as null_flightnum_count,
                                count_if(ActualElapsedTime is null) as null_actualedelapsedtime_count,
                                count_if(CRSElapsedTime is null) as null_CRSElapsedTime_count,
                                count_if(arrdelay is null) as null_arrdelay_count
                                from flights_temp
                              """)
display(invalid_count_sql)                              

In [0]:
# counting invalid values using spark dataframe API code
from pyspark.sql.functions import col
from pyspark.sql.functions import sum
from pyspark.sql.functions import when

# use Spark SQL to count Nulls 
flights_small_required_cols_df.selectExpr("Year",
                                      "Month",
                                      "DayofMonth",
                                      "try_cast(DepTime as INT) as DepTime",
                                      "FlightNum",
                                      "try_cast(ActualElapsedTime AS INT) AS ActualElapsedTime",
                                      "CRSElapsedTime",
                                      "try_cast(ArrDelay AS INT) AS ArrDelay"
                                      ).createOrReplaceTempView("flights_temp")


flights_invalid_df = spark.table("flights_temp")

# use dataframe API to count invalid values
invalid_count_df = flights_invalid_df.select(    
    sum(when(col("Year").isNull(), 1).otherwise(0)).alias("null_year_count"),
    sum(when(col('Month').isNull(), 1).otherwise(0)).alias("null_month_count"),
    sum(when(col('DayofMonth').isNull(), 1).otherwise(0)).alias("null_dayofmonth_count"),
    sum(when(col('DepTime').isNull(), 1).otherwise(0)).alias("null_deptime_count"),
    sum(when(col('FlightNum').isNull(), 1).otherwise(0)).alias("null_flightnum_count"),
    sum(when(col('ActualElapsedTime').isNull(), 1).otherwise(0)).alias("null_actualedelapsedtime_count"),
    sum(when(col('CRSElapsedTime').isNull(), 1).otherwise(0)).alias("null_crseelapsedtime_count"),
    sum(when(col('ArrDelay').isNull(), 1).otherwise(0)).alias("null_arrdelay_count")
)

display(invalid_count_df)

In [0]:
invalid_count_df.explain()

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas DataFrame
invalid_count_pd = invalid_count_df.toPandas()

# Prepare data for bar chart
columns = invalid_count_pd.columns
null_counts = invalid_count_pd.iloc[0].values

plt.figure(figsize=(10, 6))
plt.bar(columns, null_counts, color='skyblue')
plt.xlabel('Column')
plt.ylabel('Null Count')
plt.title('Null Counts per Column')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [0]:
not_null_flights_df = flights_small_required_cols_df.na.drop(how="any",subset=['CRSElapsedTime'])

In [0]:
flights_with_valid_data_df.display()

In [0]:
from pyspark.sql.functions import col

# delete rows with invalid data in ArrDelay, ActualElapsedDate & DepTime columns
flights_with_valid_data_df = not_null_flights_df.filter(
    col("ArrDelay").try_cast("Integer").isNotNull() &
    col("ActualElapsedTime").try_cast("Integer").isNotNull() &
    col("DepTime").try_cast("Integer").isNotNull() 
)

In [0]:
# now cast the fields to integer
clean_flights_data_df = flights_with_valid_data_df.withColumn("ArrDelay",col("ArrDelay").cast("Integer")).withColumn("ActualElapsedTime",col("ActualElapsedTime").cast("Integer"))
clean_flights_data_df.printSchema()

In [0]:
display(clean_flights_data_df)

In [0]:
# Data Enrichment
# Concatenate Year, Month and DayofMonth columns to create a new column called Date Timestamp

from pyspark.sql.functions import col,make_timestamp_ntz,lpad, substr,lit

flights_with_datetime_df = clean_flights_data_df.withColumn(
    "FlightDateTime", 
    make_timestamp_ntz(
        col("year"),
        col("Month"),
        col("DayofMonth"),
        substr(lpad(col("DepTime"),4,"0"),lit(1),lit(2)).cast("integer"),
        substr(lpad(col("DepTime"),4,"0"),lit(3),lit(2)).cast("integer"),
        lit(0)
        )
    ).drop("Year","Month","DayofMonth","DepTime")

# show the result
display(flights_with_datetime_df.limit(10))

In [0]:
# finding elapsed date time from 

from pyspark.sql.functions import col

flights_elapsed_time_df = flights_with_datetime_df.withColumn("ElapsedTimeDiff", col("ActualElapsedTime")-col("CRSElapsedTime")).drop("ActualElapsedTime","CRSElapsedTime")

display(flights_elapsed_time_df)

In [0]:
# calculate delay category based on ArrDelay time
from pyspark.sql.functions import col

flight_delay_category_df = flights_elapsed_time_df.withColumn("delay_category",
                                                        when(col("ArrDelay")<=0, "On Time").
                                                        when(col("ArrDelay")<=15 , "Slight Delay").
                                                        when(col("ArrDelay")<=60, "Moderate Delay").
                                                        otherwise("SevereDelay")
                                                        ). drop("ArrDelay")
display(flight_delay_category_df)                                                        

In [0]:
# using UDF functions, calculate Z score (standard deviation and mean)

from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def normalized_diff(diff_series):
    return (diff_series - diff_series.mean()) / diff_series.std()

udf_example = flight_delay_category_df.withColumn("normalized_diff", normalized_diff("ElapsedTimeDiff"))

display(udf_example)