In [0]:
%sql

SHOW CATALOGS;

In [0]:
spark.catalog.listCatalogs()

In [0]:
tables = spark.catalog.listTables("databricks_airline_performance_data.v01")
tables


## Data Loading

1. spark.read directly from the catalog(flight_small)
2. inspect subset of data
3. remove not-needed columns
4. drop rows that are not required
5. count the number of records
6. .selectExpr().createOrReplaceTempView() to run SQL on the view
7. SparkSQL to see NULL values per each attribute/column
8. Dataframe API to **COUNT** NULL values per each attribute/column
9. Explain plan .explain() for both SparkSQL object and Dataframe API

In [0]:
df = spark.read.table("databricks_airline_performance_data.v01.flights_small")

# remove not needed columns
df_new = df.select("Year", "Month", "DayofMonth", "DayOfWeek", "DepTime", "ArrTime", "ActualElapsedTime", "UniqueCarrier", "FlightNum", "Origin", "Dest", "Distance", "CarrierDelay", "WeatherDelay", "SecurityDelay", "IsArrDelayed", "IsDepDelayed")


display(df_new)

Databricks visualization. Run in Databricks to view.

In [0]:
df_new.printSchema()

print(df_new.count())

In [0]:

# display(df_new.filter(df_new['CarrierDelay'].isNull()))


from pyspark.sql.functions import col, count, when

null_counts = df_new.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df_new.columns
])

null_counts.show()
display(type(null_counts)) # dataframe

In [0]:
# df_new = df_new.filter(col("ArrTime").contains('NA'))

df_non_null = df_new.na.drop(how="any", subset=['ArrTime', 'DepTime'])

# df.na.drop(how="any/all", subset=['ArrTime', 'DepTime'], thresh=70)
df_non_null.show()

In [0]:
from pyspark.sql.functions import when, col

df_non_na = df_non_null.withColumn(
    "ArrTime",
    when(col("ArrTime") == "NA", None).otherwise(col("ArrTime")).cast("int")
)

df_non_na = df_non_na.fillna(0)

df_non_na.show()

In [0]:

not_null_counts = df_non_null.select([
    count(col(c)).alias(c) for c in df_non_null.columns
])

not_null_counts.show()

In [0]:
df_non_null.selectExpr("UniqueCarrier", "FlightNum", "DepTime", "ArrTime", "ActualElapsedTime") \
    .createOrReplaceTempView('vw_df_new')

# Columns can be casted inside selectExpr("DepTime as INT")

In [0]:
df_vw = spark.table('vw_df_new')

df_vw.show()

In [0]:
%sql

select uniqueCarrier, count(flightNum) as No_of_Flights from vw_df_new
group by uniqueCarrier;

In [0]:
%sql

select count_if(UniqueCarrier is NULL) as UniqueCarrier,
count_if(FlightNum is NULL) as FlightNum,
count_if(DepTime is NULL) as DepTime,
count_if(ArrTime is NULL) as ArrTime,
count_if(ActualElapsedTime is NULL) as ActualElapsedTime
   from vw_df_new;

In [0]:
df_non_null.explain()


## Databricks AI Assistant

1. Generate a code using AI Assistant and plot visualizations


## Data Cleaning

1. Drop invalid or missing values df.na.drop() - lazy evaluation
2. Fix data type issues - casting: non integer. Remove rows with invalid values for specific columns
3. Replace existing columns using withColumn()


## Data Enrichment

1. Derive dateTime column from year, month, dayofmonth, deptime etc columns. Make use of withColumn().drop('source columns') and make_timestamp_ntz(), substr(), lit(), lpad()
2. Calculate Elapsed Time Difference
3. Categorize a column e.g., ArrDelay (Moderate Delay, Severe, On-Time)

*.withColumn()* is being used to create new column or to replace existing column with updated data

In [0]:
df_non_na.select('*').groupBy("Year").count().show()

In [0]:
from pyspark.sql.functions import make_timestamp_ntz, lit

df_date_time_derived = df_non_na.withColumn("DateTime", make_timestamp_ntz("Year", "Month", "DayofMonth", "ArrTime", lit(0), lit(0)))

#Year:integer, Month:integer, DayofMonth:integer, ArrTime
# 1999|    4|         6|    0|
df_date_time_derived.show()

In [0]:
df_non_na.withColumn("Arrival Time Category", when(col("ArrTime") > 100, "On-Time")
                     .when(col("ArrTime") < 100, "Okay")
                     .otherwise("Delayed")).show()


## Analyze Data

1. Direct Reference .select()
2. Column Object .select(col().alias())
3. String Expression .selectExpr(colA, colB, Expression as colC)

In [0]:
df_non_na.select("FlightNum").show()

In [0]:
df_non_na.select(col("FlightNum").alias("Flight Number")).show()


In [0]:
df_non_na.selectExpr("FlightNum", "FlightNum > 2 as FlightNumber").show()


## UDF

1. Vectorized UDF @panadas_Udf("double" -> series) e.g., normalizedDiff

In [0]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def distance_category(flight_number_series: pd.Series) -> pd.Series:
    # Example logic: categorize flight numbers
    return flight_number_series.map(lambda x: "High" if int(x) > 250 else "Low")

df_non_na.select("FlightNum", distance_category("Distance").alias("DistanceCategory")).show()


## Put it All Together

1. Drop table
2. Chain it all together