In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_timestamp, dayofweek, hour
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
input_path = "dbfs:/FileStore/shared_uploads/elsa4940@mylaurier.ca/yellow_tripdata_2015_01.csv"
df = spark.read.format("csv").option("header", "true").load(input_path)

# Show schema and first few rows
df.printSchema()
df.show(5)

##Data Preparation

In [0]:
# Convert necessary columns to appropriate data types
df = df.withColumn("fare_amount", col("fare_amount").cast("float")) \
       .withColumn("trip_distance", col("trip_distance").cast("float")) \
       .withColumn("pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
       .withColumn("dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))

# Create new time-based features
df = df.withColumn("pickup_hour", hour(col("pickup_datetime"))) \
       .withColumn("day_of_week", dayofweek(col("pickup_datetime")))

# Drop rows with null or invalid values
df = df.dropna(subset=["fare_amount", "trip_distance", "pickup_datetime", "dropoff_datetime"])


##Classification - Target Variable Creation

In [0]:
# Create target column for classification
df = df.withColumn("high_fare", (col("fare_amount") > 20).cast("int"))

# Split data into training and testing sets
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)


##Classification Pipeline 1 - Decision Tree

In [0]:
# Define pipeline stages
assembler = VectorAssembler(inputCols=["trip_distance", "pickup_hour", "day_of_week"], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
dt = DecisionTreeClassifier(featuresCol="scaled_features", labelCol="high_fare")

# Create pipeline
from pyspark.ml import Pipeline
pipeline_dt = Pipeline(stages=[assembler, scaler, dt])

# Hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.minInstancesPerNode, [1, 5, 10]) \
    .build()

crossval = CrossValidator(estimator=pipeline_dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="high_fare"),
                          numFolds=3)

# Train model
cv_model_dt = crossval.fit(train_df)

# Evaluate model
predictions_dt = cv_model_dt.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="high_fare")
f1_score_dt = evaluator.evaluate(predictions_dt)
print(f"F1 Score for Decision Tree: {f1_score_dt}")

# Save pipeline
cv_model_dt.bestModel.write().overwrite().save("dbfs:/FileStore/pipelines/decision_tree_pipeline")


##Classification Pipeline 2 - Logistic Regression

In [0]:
# Logistic Regression Pipeline
lr = LogisticRegression(featuresCol="scaled_features", labelCol="high_fare")

pipeline_lr = Pipeline(stages=[assembler, scaler, lr])

# Hyperparameter tuning
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.maxIter, [50, 100]) \
    .build()

crossval_lr = CrossValidator(estimator=pipeline_lr,
                             estimatorParamMaps=paramGrid_lr,
                             evaluator=BinaryClassificationEvaluator(labelCol="high_fare"), 
                             numFolds=3)

# Train model
cv_model_lr = crossval_lr.fit(train_df)

# Evaluate model
predictions_lr = cv_model_lr.transform(test_df)
f1_score_lr = evaluator.evaluate(predictions_lr)
print(f"F1 Score for Logistic Regression: {f1_score_lr}")

# Save pipeline
cv_model_lr.bestModel.write().overwrite().save("dbfs:/FileStore/pipelines/logistic_regression_pipeline")


##Regression - Feature Engineering

In [0]:
# Prepare data for regression
df = df.withColumn("trip_duration", (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60)

# Drop rows with negative or null trip duration
df = df.filter(col("trip_duration") > 0)

# Split data
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)


##Regression Pipeline 1 - Linear Regression

In [0]:
# Linear Regression Pipeline
lr_reg = LinearRegression(featuresCol="scaled_features", labelCol="fare_amount")

pipeline_lr_reg = Pipeline(stages=[assembler, scaler, lr_reg])

# Hyperparameter tuning
paramGrid_lr_reg = ParamGridBuilder() \
    .addGrid(lr_reg.regParam, [0.01, 0.1]) \
    .addGrid(lr_reg.maxIter, [50, 100]) \
    .build()

crossval_lr_reg = CrossValidator(estimator=pipeline_lr_reg,
                                 estimatorParamMaps=paramGrid_lr_reg,
                                 evaluator=RegressionEvaluator(labelCol="fare_amount", metricName="rmse"),
                                 numFolds=3)

cv_model_lr_reg = crossval_lr_reg.fit(train_df)

# Evaluate model
predictions_lr_reg = cv_model_lr_reg.transform(test_df)
rmse = RegressionEvaluator(labelCol="fare_amount").evaluate(predictions_lr_reg)
print(f"RMSE for Linear Regression: {rmse}")

# Save pipeline
cv_model_lr_reg.bestModel.write().overwrite().save("dbfs:/FileStore/pipelines/linear_regression_pipeline")


##Regression Pipeline 2 - Random Forest

In [0]:
# Random Forest Pipeline
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="fare_amount")

pipeline_rf = Pipeline(stages=[assembler, scaler, rf])

# Hyperparameter tuning
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=RegressionEvaluator(labelCol="fare_amount", metricName="rmse"),
                             numFolds=3)

cv_model_rf = crossval_rf.fit(train_df)

# Evaluate model
predictions_rf = cv_model_rf.transform(test_df)
rmse_rf = RegressionEvaluator(labelCol="fare_amount").evaluate(predictions_rf)
print(f"RMSE for Random Forest: {rmse_rf}")

# Save pipeline
cv_model_rf.bestModel.write().overwrite().save("dbfs:/FileStore/pipelines/random_forest_pipeline")


# Results and Discussion

## Classification
- Decision Tree F1 Score: [Insert Value]
- Logistic Regression F1 Score: [Insert Value]
- Best Model: [Insert Model]

## Regression
- Linear Regression RMSE: [Insert Value]
- Random Forest RMSE: [Insert Value]
- Best Model: [Insert Model]

[Add further observations, charts, and tuning results as required.]