In [0]:
!pip install pyspark

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 317.0/317.0 MB 2.0 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200.5/200.5 kB 25.7 MB/s eta 0:00:00
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488496 sha256=6d4c530b63d385a655149321c869858de1bb641efdf9820b8be335c2a82ef319
  Stored in directory: /home/spark-b245d5d4-d166-441a-a1e1-ab/.cache/pip/wheels/22/f3/c0/49d7c304ee9ebfd58d8417a140fa93a306ea3d28d19e9af018
Successf

In [0]:
(sc, spark) 

(<SparkContext master=spark://10.139.64.10:7077 appName=Databricks Shell>,
 <pyspark.sql.session.SparkSession at 0x7f27c9fa64d0>)

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("MLlib lab") \
    .config("spark.sql.parquet.enableVectorizedReader", "false") \
    .getOrCreate()
    
# swith the latest spark version to older one so that it tolerates some data format issues
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

""" 
in order to avoid "Parquet column cannot be converted" error, we need to disable vectorized reader when we have decimal values in our columns. 
refer to https://learn.microsoft.com/en-us/answers/questions/853861/parquet-column-cannot-be-converted for further info
"""
# spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false") 

sc = spark.sparkContext


In [0]:
%matplotlib inline
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import Row

In [0]:
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, TimestampNTZType, LongType, DoubleType,IntegerType
"""
We have encountered "Parquet column cannot be converted" error. As a workaround we decided to loop through directory and ensure there is no column type mismatch by checking file by file.
"""

directory = '/mnt/2024-team14/'

# List all parquet files
parquet_files = dbutils.fs.ls(directory)
#print(parquet_files)
#weather_file = parquet_files.pop()
#taxi_lookup = parquet_files.pop()
#json = parquet_files.pop()
#traffic = parquet_files.pop()

# Loop through directory and check where there is a column type mismatch between files
"""prev_types, curr_types = None, None
mismatches = {}
for file in parquet_files: 
  if file.path == "/mnt/2024-team14/weather_data.csv": 
    df1 = spark.read.csv(path) 
    continue
  # Read the Parquet file with schema inference
  df = spark.read.parquet(file.path)

  if not prev_types and not curr_types:
    curr_types = df.dtypes
    continue
  
  prev_types = curr_types
  curr_types = df.dtypes

  # check each column
  for i in range(len(df.columns)):
    if prev_types[i] != curr_types[i]:
      if not file.name in mismatches:
        mismatches[file.name] = [(prev_types[i][0], (prev_types[i][1], curr_types[i][1]))]
      else:
        mismatches[file.name].append((prev_types[i][0], (prev_types[i][1], curr_types[i][1])))"""

# print(mismatches)
schema = StructType([
  StructField('hvfhs_license_num', StringType(), nullable=True), 
  StructField('dispatching_base_num', StringType(), nullable=True), 
  StructField('originating_base_num', StringType(), nullable=True), 
  StructField('request_datetime', TimestampNTZType(), nullable=True), 
  StructField('on_scene_datetime', TimestampNTZType(), nullable=True), 
  StructField('pickup_datetime', TimestampNTZType(), nullable=True), 
  StructField('dropoff_datetime', TimestampNTZType(), nullable=True), 
  StructField('PULocationID', LongType(), nullable=True), 
  StructField('DOLocationID', LongType(), nullable=True), 
  StructField('trip_miles', DoubleType(), nullable=True), 
  StructField('trip_time', LongType(), nullable=True), 
  StructField('base_passenger_fare', DoubleType(), nullable=True), 
  StructField('tolls', DoubleType(), nullable=True), 
  StructField('bcf', DoubleType(), nullable=True), 
  StructField('sales_tax', DoubleType(), nullable=True), 
  StructField('congestion_surcharge', DoubleType(), nullable=True), 
  StructField('airport_fee', IntegerType(), nullable=True), 
  StructField('tips', DoubleType(), nullable=True), 
  StructField('driver_pay', DoubleType(), nullable=True), 
  StructField('shared_request_flag', StringType(), nullable=True), 
  StructField('shared_match_flag', StringType(), nullable=True), 
  StructField('access_a_ride_flag', StringType(), nullable=True), 
  StructField('wav_request_flag', StringType(), nullable=True), 
  StructField('wav_match_flag', IntegerType(), nullable=True)
])
# Create empty dataframe
union_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

# Process each Parquet file
for file in parquet_files:
  if file.path == "dbfs:/mnt/2024-team14/MTA_2020mar_2024apr.csv":
    df3 = spark.read.csv(file.path, header=True)
  if file.path == "dbfs:/mnt/2024-team14/weather_data.csv":
    df1 = spark.read.csv(file.path, header=True)
  if file.path == 'dbfs:/mnt/2024-team14/taxi_zone_lookup.csv':
    df2 = spark.read.csv(file.path, header=True)
  if "parquet" in file.path:  
    df = spark.read.parquet(file.path)
    # Read the Parquet file with schema inference
  #df = spark.read.parquet(file.path)

    # (column with mismatch, desirable type)
  mismatch_col = [
      ("wav_match_flag", "string"), 
      ("airport_fee", "double"), 
      ("PULocationID", "bigint"), 
      ("DOLocationID", "bigint")
    ]

  df = df.withColumns({c: F.col(c).cast(t) for c, t in mismatch_col})

    # Union the casted DataFrame with the union_df
  union_df = union_df.union(df)

# Revert the variable name after unioning the whole data
df = union_df

total_rows = df.count()
print(f"Total number of rows:{total_rows}")

Total number of rows:1094543412


In [0]:
df=df.withColumn("day",F.dayofyear('pickup_datetime'))
df=df.withColumn("hour",F.hour('pickup_datetime'))
#df=df.withColumn("month",F.month('pickup_datetime'))
df=df.withColumn("year",F.year('pickup_datetime'))
df=df.withColumn("Date",F.to_date('pickup_datetime'))
drop = ['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num', 'request_datetime', 'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID', 'tolls', 'bcf', 'sales_tax', 'congestion_surcharge',
 'airport_fee', 'tips', 'driver_pay', 'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag', 'wav_request_flag', 'wav_match_flag']
df = df.drop(*drop)

In [0]:
import holidays
year_range = (2019, 2025)
hds = []
for y in range(year_range[0], year_range[1]):
  hds += holidays.US(state="NY", years=y).keys()

In [0]:
df = df.withColumns({
  "isWeekend": F.when(F.col("Date").isin(hds) | F.dayofweek(F.col("Date")).isin([1, 7]), 1).otherwise(0),
  "isOvernight": F.when(F.col("hour").isin(list(range(20, 24))+list(range(0, 6))), 1).otherwise(0)}) \
  .withColumn("isRushhour", 
              F.when(F.col("hour").isin(list(range(16, 20))) | (F.col("isWeekend") == 0), 1).otherwise(0)
              )

In [0]:
df=df.withColumn("trip_meters",F.col('trip_miles')*1609.35)

In [0]:
df.columns

['Date',
 'TMAX (Degrees Fahrenheit)',
 'TMIN (Degrees Fahrenheit)',
 'PRCP (Inches)',
 'SNOW (Inches)',
 'SNWD (Inches)',
 'PRCP (mm)',
 'SNOW (mm)',
 'SNWD (mm)']

In [0]:
df = df.filter(df.year==2024)
df = df.drop(*['Date','trip_miles','year'])

In [0]:
df =df.cache()

In [0]:
from pyspark.sql.functions import rand
# Set the random seed (for reproducibility)
seed = 42
# Split the DataFrame into training and test sets using a fixed random split
train_df, test_df = df.randomSplit([0.7, 0.3], seed=seed)
# Print the sizes of the training and test sets
print(f"Training Data Size: {train_df.count()}")
print(f"Test Data Size: {test_df.count()}")

com.databricks.backend.common.rpc.DriverStoppedException: Driver down cause: driver state change (exit code: 137)
	at com.databricks.spark.chauffeur.ChauffeurState.processDriverStateChange(ChauffeurState.scala:465)
	at com.databricks.spark.chauffeur.Chauffeur.onDriverStateChange(Chauffeur.scala:1354)
	at com.databricks.spark.chauffeur.Chauffeur.$anonfun$driverStateOpt$1(Chauffeur.scala:180)
	at com.databricks.spark.chauffeur.Chauffeur.$anonfun$driverStateOpt$1$adapted(Chauffeur.scala:180)
	at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$goToStopped$4(DriverDaemonMonitorImpl.scala:210)
	at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$goToStopped$4$adapted(DriverDaemonMonitorImpl.scala:210)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.goToStopped(DriverDaemonMonitorImpl.scala:210)
	at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.monitorDriver(DriverDaemonMonitorImpl.s

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse",labelCol='base_passenger_fare')
from pyspark.ml import Pipeline

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler3 = VectorAssembler(
    inputCols=['trip_time', 'day', 'isWeekend', 'isOvernight', 'isRushhour', 'trip_meters'],
    outputCol="features",
    handleInvalid="skip"
    )

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
#vectorAssembler3.transform(train_df).show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
from pyspark.ml.regression import GBTRegressor
lr = GBTRegressor(maxDepth=5,labelCol="base_passenger_fare", featuresCol="features")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
import time
start = time.time()
lr_pipeline = Pipeline(stages=[vectorAssembler3, lr])
lr_pipeline_model = lr_pipeline.fit(train_df)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
lr_predictions = lr_pipeline_model.transform(test_df)
rmse = evaluator.evaluate(lr_predictions)
print(f"The Root Mean Squared Error: {rmse}")
end = time.time()
print(f"Time Required to run the model: {end-start} seconds")

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
#lr_pipeline_model.transform(test_df).show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite

In [0]:
partitions = [1,2,4,8,16,32,64,128,256,512]
for i in partitions:
  start_time = time.time()
  train_df_repartition= train_df.repartition(i).cache()
  lr_pipeline_model = lr_pipeline.fit(train_df)
  lr_predictions = lr_pipeline_model.transform(test_df)
  end_time= time.time()
  with open("Global_Ensemble_gbt.csv", "a") as f:
        print(
              f"{i},{evaluator.evaluate(lr_predictions)},{end - start}",
              file=f,
              )

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.abortRunningSequence(ExecContextState.scala:757)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$shutdown$4(ExecContextState.scala:455)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.Ite