In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("airlineAnalytics").getOrCreate()

In [0]:
# Define the parameters for your Azure Blob Storage
storage_account_name = "dataairline"
container_name = "data"
sas_token = "sp=racwdli&st=2024-05-01T04:49:42Z&se=2024-10-05T12:49:42Z&spr=https&sv=2022-11-02&sr=c&sig=ZcMW5BxSC9i%2FXgzD%2BRUxHqUPrguVrYKgPi4jTRC1CaQ%3D"  



In [0]:
# Define the mount point path in DBFS (Databricks File System)
dbfs_mount_point = "/mnt/dataairline/data"

# Check if the mount point already exists, unmount if it does
if any(mount.mountPoint == dbfs_mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(dbfs_mount_point)

# Mount the Azure Blob Storage container
configs = {
  "fs.azure.sas." + container_name + "." + storage_account_name + ".blob.core.windows.net": sas_token
}

dbutils.fs.mount(
  source = "wasbs://" + container_name + "@" + storage_account_name + ".blob.core.windows.net",
  mount_point = dbfs_mount_point,
  extra_configs = configs
)

# To verify that the mount was successful
display(dbutils.fs.ls(dbfs_mount_point))


/mnt/dataairline/data has been unmounted.


path,name,size,modificationTime
dbfs:/mnt/dataairline/data/merged_data.csv,merged_data.csv,6931362239,1714460780000


In [0]:
spark = SparkSession.builder \
    .config("spark.driver.maxResultSize", "8g") \
    .getOrCreate()

In [0]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
# Create a Spark session
spark = SparkSession.builder \
    .appName("AirlineDelayPrediction") \
    .getOrCreate()

# Load the data from CSV file
df = spark.read.csv("/mnt/dataairline/data/merged_data.csv", header=True)

# Display the schema and some sample data
df.printSchema()
df.show(5)


root
 |-- _c0: string (nullable = true)
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_TIME: string (nullable = true)
 |-- ARR_DELAY: string (nullable = true)
 |-- CANCELLED: string (nullable = true)
 |-- DIVERTED: string (nullable = true)
 |-- CRS_ELAPSED_TIME: string (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)

+---+----------+----------+-----------------+------+----+------------+--------+---------+--------+-------+------------+--------+---------+---------+--------+---------

In [0]:
# Select relevant columns and convert them to the appropriate data types
selected_cols = ['DEP_DELAY', 'TAXI_OUT', 'TAXI_IN', 'CRS_ARR_TIME', 'CRS_ELAPSED_TIME', 'DISTANCE']
df = df.select(*selected_cols)
df = df.withColumn('DEP_DELAY', df['DEP_DELAY'].cast('float'))
df = df.withColumn('TAXI_OUT', df['TAXI_OUT'].cast('float'))
df = df.withColumn('TAXI_IN', df['TAXI_IN'].cast('float'))
df = df.withColumn('CRS_ARR_TIME', df['CRS_ARR_TIME'].cast('float'))
df = df.withColumn('CRS_ELAPSED_TIME', df['CRS_ELAPSED_TIME'].cast('float'))
df = df.withColumn('DISTANCE', df['DISTANCE'].cast('float'))

# Drop rows with null values
df = df.dropna()


In [0]:
df.head(5)

[Row(DEP_DELAY=-5.0, TAXI_OUT=15.0, TAXI_IN=10.0, CRS_ARR_TIME=1745.0, CRS_ELAPSED_TIME=268.0, DISTANCE=1605.0),
 Row(DEP_DELAY=-8.0, TAXI_OUT=11.0, TAXI_IN=7.0, CRS_ARR_TIME=1254.0, CRS_ELAPSED_TIME=99.0, DISTANCE=414.0),
 Row(DEP_DELAY=-5.0, TAXI_OUT=15.0, TAXI_IN=5.0, CRS_ARR_TIME=1649.0, CRS_ELAPSED_TIME=134.0, DISTANCE=846.0),
 Row(DEP_DELAY=6.0, TAXI_OUT=19.0, TAXI_IN=6.0, CRS_ARR_TIME=1756.0, CRS_ELAPSED_TIME=190.0, DISTANCE=1120.0),
 Row(DEP_DELAY=20.0, TAXI_OUT=13.0, TAXI_IN=10.0, CRS_ARR_TIME=922.0, CRS_ELAPSED_TIME=112.0, DISTANCE=723.0)]

In [0]:
# Create a feature vector
feature_cols = ['TAXI_OUT', 'TAXI_IN', 'CRS_ARR_TIME', 'CRS_ELAPSED_TIME', 'DISTANCE']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df = assembler.transform(df)

In [0]:
df.head(5)

[Row(DEP_DELAY=-5.0, TAXI_OUT=15.0, TAXI_IN=10.0, CRS_ARR_TIME=1745.0, CRS_ELAPSED_TIME=268.0, DISTANCE=1605.0, features=DenseVector([15.0, 10.0, 1745.0, 268.0, 1605.0])),
 Row(DEP_DELAY=-8.0, TAXI_OUT=11.0, TAXI_IN=7.0, CRS_ARR_TIME=1254.0, CRS_ELAPSED_TIME=99.0, DISTANCE=414.0, features=DenseVector([11.0, 7.0, 1254.0, 99.0, 414.0])),
 Row(DEP_DELAY=-5.0, TAXI_OUT=15.0, TAXI_IN=5.0, CRS_ARR_TIME=1649.0, CRS_ELAPSED_TIME=134.0, DISTANCE=846.0, features=DenseVector([15.0, 5.0, 1649.0, 134.0, 846.0])),
 Row(DEP_DELAY=6.0, TAXI_OUT=19.0, TAXI_IN=6.0, CRS_ARR_TIME=1756.0, CRS_ELAPSED_TIME=190.0, DISTANCE=1120.0, features=DenseVector([19.0, 6.0, 1756.0, 190.0, 1120.0])),
 Row(DEP_DELAY=20.0, TAXI_OUT=13.0, TAXI_IN=10.0, CRS_ARR_TIME=922.0, CRS_ELAPSED_TIME=112.0, DISTANCE=723.0, features=DenseVector([13.0, 10.0, 922.0, 112.0, 723.0]))]

In [0]:
# Split the data into training and test sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)



In [0]:
train_df.count()

49243105

In [0]:
# Define the base models
lr = LinearRegression(featuresCol='features', labelCol='DEP_DELAY')
rf = RandomForestRegressor(featuresCol='features', labelCol='DEP_DELAY')
gbt = GBTRegressor(featuresCol='features', labelCol='DEP_DELAY')


In [0]:
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline



In [0]:
lr = LinearRegression(featuresCol='features', labelCol='DEP_DELAY')
rf = RandomForestRegressor(featuresCol='features', labelCol='DEP_DELAY')
gbt = GBTRegressor(featuresCol='features', labelCol='DEP_DELAY')

pipeline = Pipeline(stages=[lr.setPredictionCol('lr_prediction'), rf.setPredictionCol('rf_prediction'), gbt.setPredictionCol('gbt_prediction')])
model = pipeline.fit(train_df)

Downloading artifacts:   0%|          | 0/33 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
# Make predictions
predictions = model.transform(test_df)


In [0]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol='DEP_DELAY', predictionCol='lr_prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"RMSE for Linear Regression: {rmse}")

RMSE for Linear Regression: 36.50913146764792


In [0]:
evaluator = RegressionEvaluator(labelCol='DEP_DELAY', predictionCol='rf_prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"RMSE for Random Forest: {rmse}")


RMSE for Random Forest: 36.4310083658203


In [0]:
evaluator = RegressionEvaluator(labelCol='DEP_DELAY', predictionCol='gbt_prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f"RMSE for Gradient Boosting: {rmse}")

RMSE for Gradient Boosting: 36.370579661035116


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assuming 'prediction' column contains the model's raw predictions
evaluator = BinaryClassificationEvaluator(labelCol='DEP_DELAY', rawPredictionCol='gbt_prediction', metricName='areaUnderROC')
accuracy = evaluator.evaluate(predictions)

In [0]:
print(f"accuracy for Gradient Boosting: {accuracy}")

RMSE for Gradient Boosting: 0.6256527308937533


In [0]:
import os

# Check if the directory exists and create if not
directory = "/mnt/dataairline/data"
if not os.path.exists(directory):
    os.makedirs(directory)

# Delete the file if it already exists
file_path = os.path.join(directory, "model")
if os.path.exists(file_path):
    os.remove(file_path)

# Save the model
model.write().overwrite().save(file_path)

In [0]:
# Save the predictions
predictions.write.save("/mnt/dataairline/predictions", format='parquet')

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor, GeneralizedLinearRegression
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator



In [0]:
# Decision Tree
dt = DecisionTreeRegressor(featuresCol='features', labelCol='DEP_DELAY')
dt_model = dt.fit(train_df)

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
dt_predictions = dt_model.transform(test_df)
dt_predictions.show()  # Check the column names available in the DataFrame
dt_rmse = evaluator.evaluate(dt_predictions.withColumnRenamed("prediction", "gbt_prediction"))
print(f"RMSE for Decision Tree: {dt_rmse}")

+---------+--------+-------+------------+----------------+--------+--------------------+------------------+
|DEP_DELAY|TAXI_OUT|TAXI_IN|CRS_ARR_TIME|CRS_ELAPSED_TIME|DISTANCE|            features|        prediction|
+---------+--------+-------+------------+----------------+--------+--------------------+------------------+
|    -60.0|    51.0|    6.0|      1835.0|           135.0|   689.0|[51.0,6.0,1835.0,...| 23.72516723569282|
|    -51.0|    18.0|    3.0|      1917.0|            45.0|   198.0|[18.0,3.0,1917.0,...| 5.907273614096057|
|    -50.0|    14.0|    9.0|      1920.0|            48.0|   198.0|[14.0,9.0,1920.0,...| 5.907273614096057|
|    -49.0|    69.0|   11.0|        23.0|           333.0|  2475.0|[69.0,11.0,23.0,3...| 10.20656256644067|
|    -46.0|     8.0|    5.0|      2030.0|            70.0|   345.0|[8.0,5.0,2030.0,7...| 12.94960468951689|
|    -45.0|     6.0|    3.0|      2210.0|           179.0|  1192.0|[6.0,3.0,2210.0,1...|14.523343571860387|
|    -43.0|     4.0|    4.0|