In [1]:
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Create SparkSession in local mode
spark = SparkSession.builder \
    .appName("FareTipModeling") \
    .master("local[*]") \
    .getOrCreate()


In [2]:
folder_path = Path(r"D:\L4S2\Big Data\assignment\NYC_Taxi_Trip_Data_Analysis\data\cleaned")

parquet_files = sorted(folder_path.glob("*.parquet"))
print(f"Found {len(parquet_files)} parquet files:")
for f in parquet_files:
    print(f.name)


Found 8 parquet files:
part-00000-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00001-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00002-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00003-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00004-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00005-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00006-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
part-00007-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet


In [3]:
def filter_valid_fares_tips(df):
    filtered_df = df.filter((df.fare_amount > 0) & (df.tip_amount >= 0))
    print(f"Filtered from {df.count()} rows to {filtered_df.count()} rows with valid fare and tip amounts")
    return filtered_df


In [4]:
for file_path in parquet_files:
    print(f"\nProcessing file: {file_path.name}")
    df = spark.read.parquet(str(file_path))
    
    # Run your filter
    filtered_df = filter_valid_fares_tips(df)
    
    # You can add more processing steps here (e.g. modeling)



Processing file: part-00000-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
Filtered from 3918134 rows to 3918134 rows with valid fare and tip amounts

Processing file: part-00001-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
Filtered from 3933455 rows to 3933455 rows with valid fare and tip amounts

Processing file: part-00002-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
Filtered from 3928190 rows to 3928190 rows with valid fare and tip amounts

Processing file: part-00003-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
Filtered from 3936382 rows to 3936382 rows with valid fare and tip amounts

Processing file: part-00004-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
Filtered from 3930808 rows to 3930808 rows with valid fare and tip amounts

Processing file: part-00005-c267671b-1832-48c2-97b7-10a9827a836f-c000.snappy.parquet
Filtered from 5205187 rows to 5205187 rows with valid fare and tip amounts

Processing file: part-00006-c2676

In [5]:
df = spark.read.parquet(str(parquet_files[0]))
print(df.columns)


['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee']


In [6]:
#calculate the total size of the data set
from pathlib import Path

folder_path = Path(r"D:\L4S2\Big Data\assignment\NYC_Taxi_Trip_Data_Analysis\data\cleaned")

total_size_bytes = sum(f.stat().st_size for f in folder_path.glob("*.parquet"))
total_size_mb = total_size_bytes / (1024 * 1024)

print(f"📦 Total dataset size: {total_size_mb:.2f} MB")


📦 Total dataset size: 624.47 MB


In [7]:
# 📄 Load and combine all Parquet files into one DataFrame
df_all = spark.read.parquet(*[str(p) for p in parquet_files])

# 🎯 Select 50 sample values from the tip_amount column
print("🎯 Sample 50 values from 'tip_amount' column:")
df_all.select("tip_amount").limit(50).show(50, truncate=False)


🎯 Sample 50 values from 'tip_amount' column:
+----------+
|tip_amount|
+----------+
|0.0       |
|1.0       |
|1.0       |
|0.0       |
|3.0       |
|3.42      |
|2.08      |
|2.72      |
|4.0       |
|2.0       |
|0.1       |
|3.84      |
|0.0       |
|6.64      |
|3.84      |
|2.44      |
|0.0       |
|2.13      |
|2.0       |
|2.44      |
|2.08      |
|3.28      |
|0.0       |
|4.68      |
|5.0       |
|2.0       |
|0.0       |
|4.26      |
|3.4       |
|4.26      |
|1.85      |
|0.0       |
|0.0       |
|16.15     |
|2.0       |
|2.1       |
|2.0       |
|4.3       |
|2.0       |
|0.0       |
|0.0       |
|5.8       |
|4.1       |
|1.0       |
|1.0       |
|3.4       |
|0.0       |
|3.15      |
|2.45      |
|7.6       |
+----------+



In [12]:
# Drop 'total_amount' if it exists to avoid data leakage
if 'total_amount' in filtered_df.columns:
    filtered_df = filtered_df.drop('total_amount')

# Select only numeric feature columns and drop target
feature_cols = [col for col in filtered_df.columns if col != 'tip_amount']
target_col = 'tip_amount'


Feature Engineering I


In [20]:
from pyspark.sql.functions import hour, dayofweek, unix_timestamp, col

# Add time-based features
engineered_df = filtered_df \
    .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
    .withColumn("pickup_dayofweek", dayofweek(col("tpep_pickup_datetime"))) \
    .withColumn("trip_duration_minutes", 
                (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60)

# Optional: Drop original timestamp fields
engineered_df = engineered_df.drop("tpep_pickup_datetime", "tpep_dropoff_datetime")


In [21]:
from pyspark.sql.functions import when

engineered_df = engineered_df.withColumn("store_and_fwd_flag_numeric",
                                         when(col("store_and_fwd_flag") == "Y", 1).otherwise(0))

# Drop original string column
engineered_df = engineered_df.drop("store_and_fwd_flag")


In [22]:
from pyspark.sql.types import NumericType

numeric_cols = [f.name for f in engineered_df.schema.fields 
                if isinstance(f.dataType, NumericType) and f.name != "tip_amount"]

print("Final feature columns:")
print(numeric_cols)

assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
assembled_df = assembler.transform(engineered_df).select("features", "tip_amount")


Final feature columns:
['VendorID', 'passenger_count', 'trip_distance', 'RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'Airport_fee', 'pickup_hour', 'pickup_dayofweek', 'trip_duration_minutes', 'store_and_fwd_flag_numeric']


In [23]:
train_data, test_data = assembled_df.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol="tip_amount")
lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="tip_amount", predictionCol="prediction", metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"New RMSE with engineered features = {rmse:.2f}")


New RMSE with engineered features = 2.57


Feature Engineering II

In [14]:
from pyspark.sql.types import NumericType

# Get only numeric columns (exclude timestamps, strings, etc.)
numeric_cols = [f.name for f in filtered_df.schema.fields
                if isinstance(f.dataType, NumericType) and f.name != 'tip_amount']

print("Using these numeric columns for features:")
print(numeric_cols)

# Assemble features into 'features' column
assembler = VectorAssembler(inputCols=numeric_cols, outputCol='features')
assembled_df = assembler.transform(filtered_df).select('features', 'tip_amount')


Using these numeric columns for features:
['VendorID', 'passenger_count', 'trip_distance', 'RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'Airport_fee']


In [15]:
train_data, test_data = assembled_df.randomSplit([0.8, 0.2], seed=42)
print(f"Training rows: {train_data.count()}, Testing rows: {test_data.count()}")


Training rows: 3108808, Testing rows: 777808


In [16]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='tip_amount')
lr_model = lr.fit(train_data)


In [17]:
predictions = lr_model.transform(test_data)
predictions.select("features", "tip_amount", "prediction").show(5)


+--------------------+----------+-------------------+
|            features|tip_amount|         prediction|
+--------------------+----------+-------------------+
|[1.0,1.0,0.1,1.0,...|       0.0|-5.4452204766350185|
|[1.0,1.0,0.1,1.0,...|       0.0| -2.112436338648564|
|[1.0,1.0,0.1,1.0,...|       0.0| -2.020170053631144|
|[1.0,1.0,0.1,1.0,...|       0.0| -4.227821951385367|
|[1.0,1.0,0.1,1.0,...|       0.0| -7.086102416301555|
+--------------------+----------+-------------------+
only showing top 5 rows


In [18]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol='tip_amount', predictionCol='prediction', metricName='rmse')

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse:.2f}")


Root Mean Squared Error (RMSE) on test data = 2.57
