##Getting Data

In [1]:
# ! pip install kaggle
# ! mkdir ~/.kaggle
# ! cp kaggle.json ~/.kaggle/
# ! chmod 600 ~/.kaggle/kaggle.json
# ! kaggle competitions download nyc-taxi-trip-duration
# ! unzip nyc-taxi-trip-duration.zip
# ! unzip test.zip
# ! unzip train.zip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
mkdir: cannot create directory ‘/root/.kaggle’: File exists
Downloading nyc-taxi-trip-duration.zip to /content
100% 85.8M/85.8M [00:03<00:00, 30.2MB/s]
100% 85.8M/85.8M [00:03<00:00, 23.0MB/s]
Archive:  nyc-taxi-trip-duration.zip
  inflating: sample_submission.zip   
  inflating: test.zip                
  inflating: train.zip               
Archive:  test.zip
  inflating: test.csv                
Archive:  train.zip
  inflating: train.csv               


##Spark Installation

In [2]:
#!pip install pyspark
import pyspark
print(pyspark.__version__)
# a SparkSession object can perform the most common data processing tasks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate() # will return existing session if one was
                                                           # created before and was not closed
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=0a6b40e02e4460a24e2acdcde8203825d923828eba105c5906993bae638e51d7
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
3.4.0


# Read data

In [3]:
import pandas as pd
import numpy as np

In [4]:
# read train.csv
train_df = pd.read_csv("train.csv")
train_df.sample(10)

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
1361413,id1825003,1,2016-01-30 10:40:06,2016-01-30 10:54:39,1,-73.959587,40.777054,-73.974922,40.741737,N,873
273593,id0124704,2,2016-06-24 00:09:24,2016-06-24 00:17:13,1,-73.987373,40.741508,-73.974632,40.75824,N,469
167159,id3237988,1,2016-05-23 21:58:02,2016-05-23 22:04:56,1,-73.982346,40.769726,-73.988686,40.763264,N,414
498623,id2852423,1,2016-05-30 17:47:42,2016-05-30 17:49:47,3,-73.991264,40.749992,-73.996895,40.742409,N,125
927494,id1485370,2,2016-05-02 14:09:20,2016-05-02 14:14:06,1,-73.974251,40.75399,-73.982613,40.742371,N,286
966595,id1868866,1,2016-01-22 06:09:54,2016-01-22 06:15:35,2,-73.991745,40.756802,-73.978226,40.755356,N,341
683183,id3928475,2,2016-02-01 14:27:19,2016-02-01 14:32:56,1,-73.991539,40.757969,-73.978416,40.752911,N,337
77365,id1582144,1,2016-02-17 01:23:47,2016-02-17 01:28:54,2,-73.987007,40.729519,-74.003563,40.730022,N,307
436951,id3773636,2,2016-03-25 22:30:51,2016-03-25 23:02:03,2,-73.788765,40.642941,-73.976334,40.73595,N,1872
325421,id3241533,1,2016-02-12 17:33:20,2016-02-12 17:49:32,1,-73.992317,40.728962,-73.992516,40.728825,N,972


In [5]:
# explore train.csv
train_df.dtypes

id                     object
vendor_id               int64
pickup_datetime        object
dropoff_datetime       object
passenger_count         int64
pickup_longitude      float64
pickup_latitude       float64
dropoff_longitude     float64
dropoff_latitude      float64
store_and_fwd_flag     object
trip_duration           int64
dtype: object

# preprocessing data

In [6]:
#Check for N/A values.
print(train_df.isnull().sum())

id                    0
vendor_id             0
pickup_datetime       0
dropoff_datetime      0
passenger_count       0
pickup_longitude      0
pickup_latitude       0
dropoff_longitude     0
dropoff_latitude      0
store_and_fwd_flag    0
trip_duration         0
dtype: int64


In [7]:
def preprocessing(data_df):
    # remove any rows with missing data
    data_df = data_df.dropna()

    # convert pickup and dropoff datetime to datetime format
    data_df['pickup_datetime'] = pd.to_datetime(data_df['pickup_datetime'])
    data_df['dropoff_datetime'] = pd.to_datetime(data_df['dropoff_datetime'])

    # split datetime into month, day, week and hour
    data_df['day'] = data_df['pickup_datetime'].dt.day
    data_df['hour'] = data_df['pickup_datetime'].dt.hour
    data_df['month'] = data_df['pickup_datetime'].dt.month
    data_df['week'] = data_df['pickup_datetime'].dt.dayofweek
    return data_df


In [8]:
train_df = preprocessing(train_df)
train_df.sample(10)

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,day,hour,month,week
941870,id3190871,2,2016-03-10 18:12:19,2016-03-10 18:17:25,6,-73.970551,40.757359,-73.959068,40.763672,N,306,10,18,3,3
71996,id3483873,2,2016-02-05 13:19:32,2016-02-05 13:40:40,2,-73.992554,40.749641,-74.012383,40.713829,N,1268,5,13,2,4
44787,id1574570,2,2016-03-21 14:53:58,2016-03-21 15:34:07,1,-73.776802,40.645466,-73.978256,40.748501,N,2409,21,14,3,0
936144,id0319745,2,2016-06-27 19:12:15,2016-06-27 19:21:29,3,-73.990692,40.725861,-74.006714,40.714371,N,554,27,19,6,0
1043245,id2536332,1,2016-02-04 19:35:32,2016-02-04 19:41:02,1,-73.986275,40.734715,-73.979843,40.741451,N,330,4,19,2,3
1236903,id2934619,1,2016-01-03 00:29:40,2016-01-03 00:41:51,1,-73.990997,40.739815,-73.954231,40.786953,N,731,3,0,1,6
829001,id2778348,2,2016-02-08 00:39:37,2016-02-08 00:49:25,2,-74.005653,40.711578,-73.992119,40.749184,N,588,8,0,2,0
1445904,id2023223,1,2016-06-24 18:02:33,2016-06-24 18:10:03,1,-73.982468,40.731018,-73.991173,40.735001,N,450,24,18,6,4
912702,id3179309,2,2016-04-08 11:14:40,2016-04-08 11:53:57,1,-73.992111,40.724991,-73.975121,40.763489,N,2357,8,11,4,4
462077,id1721029,1,2016-01-16 18:06:12,2016-01-16 18:18:19,1,-73.980865,40.730755,-73.983986,40.746037,N,727,16,18,1,5


In [9]:
# Get important information
print('Trip duration in seconds: {} to {}'.format(train_df.trip_duration.min(), train_df.trip_duration.max()))

# Get passenger count range 
print('Passengers: {} to {}'.format(train_df.passenger_count.min(), train_df.passenger_count.max()))

Trip duration in seconds: 1 to 3526282
Passengers: 0 to 9


In [10]:
# remove outliers (Time and passenger count)
# < 1 min or > 3 hours

train_df = train_df[~((train_df.trip_duration < 60) | (train_df.trip_duration > 3600*3))]

# checking Trip duration
print('Trip duration in seconds: {} to {}'.format(train_df.trip_duration.min(), train_df.trip_duration.max()))

# dropping trips with passenger count = 0
print('Empty trips: {}'.format(train_df[train_df.passenger_count == 0].shape[0]))
df_train = train_df[train_df.passenger_count > 0]

Trip duration in seconds: 60 to 10731
Empty trips: 17


##Feature Extraction

In [11]:
# Define a UDF to calculate distance between two points using the Haversine formula
import numpy as np

def haversine_distance(lat1, lng1, lat2, lng2):
    R = 6371  # Earth's radius in kilometers

    # Convert latitude and longitude values to radians
    lat1, lng1, lat2, lng2 = np.radians([lat1, lng1, lat2, lng2])

    # Calculate the differences between the two points
    dlat = lat2 - lat1
    dlng = lng2 - lng1

    # Apply the Haversine formula
    a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlng / 2) ** 2
    c = 2 * np.arcsin(np.sqrt(a))
    h = R * c

    return h


def manhattan_distance(lat1, lng1, lat2, lng2):
  return abs(lng1-lng2)+abs(lat1-lat2)


def calculate_direction(lat1, lng1, lat2, lng2):
    # Convert coordinates to radians
    lat1, lng1, lat2, lng2 = np.radians([lat1, lng1, lat2, lng2])

    # Calculate the difference in longitude
    lng_delta = lng2 - lng1

    # Calculate the y and x components of the direction vector
    y = np.sin(lng_delta) * np.cos(lat2)
    x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(lng_delta)

    # Calculate the direction in degrees
    direction_degrees = np.degrees(np.arctan2(y, x))

    return direction_degrees


In [12]:
# compute haversine distance
distance1 = haversine_distance(train_df.pickup_latitude.values, train_df.pickup_longitude.values, train_df.dropoff_latitude.values, train_df.dropoff_longitude.values)

# compute manhattan distance
distance2 = manhattan_distance(train_df.pickup_latitude.values, train_df.pickup_longitude.values, train_df.dropoff_latitude.values, train_df.dropoff_longitude.values)

# compute direction
distance3 = calculate_direction(train_df.pickup_latitude.values, train_df.pickup_longitude.values, train_df.dropoff_latitude.values, train_df.dropoff_longitude.values)
# add haversine_distance / manhattan distance / direction columns to train_df
train_df['haversine_distance'] = distance1
train_df['manhattan_distance'] = distance2
train_df['direction'] = distance3

In [13]:
train_df.sample(10)

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,day,hour,month,week,haversine_distance,manhattan_distance,direction
874014,id1717065,2,2016-02-13 05:56:44,2016-02-13 06:37:10,5,-73.963516,40.775387,-73.780884,40.644878,N,2426,13,5,2,5,21.155616,0.313141,133.25142
39884,id0544563,2,2016-02-03 23:52:03,2016-02-04 00:16:45,1,-73.789642,40.643391,-73.92038,40.746799,N,1482,3,23,2,2,15.928023,0.234146,-43.745341
928185,id0602164,2,2016-03-22 13:53:31,2016-03-22 14:03:31,5,-73.964378,40.773708,-73.97541,40.78717,N,600,22,13,3,1,1.7617,0.024494,-31.817525
1384266,id3667418,1,2016-03-03 15:35:24,2016-03-03 15:46:51,2,-73.972733,40.755539,-73.954674,40.765312,N,687,3,15,3,3,1.86933,0.027832,54.448495
520929,id1662731,2,2016-05-09 13:30:29,2016-05-09 13:53:17,6,-73.960197,40.766258,-73.987511,40.748737,N,1368,9,13,5,0,3.014645,0.044834,-130.251069
392870,id0510458,1,2016-04-27 19:34:16,2016-04-27 19:41:08,2,-73.967323,40.796307,-73.953606,40.785458,N,412,27,19,4,2,1.670005,0.024567,136.24562
202144,id3881180,1,2016-06-05 21:17:04,2016-06-05 21:27:43,2,-73.946396,40.772491,-73.974678,40.787434,N,639,5,21,6,6,2.903678,0.043224,-55.086777
1409818,id2867279,1,2016-03-09 15:20:22,2016-03-09 15:42:33,1,-73.971962,40.749756,-73.995499,40.733662,N,1331,9,15,3,2,2.671068,0.039631,-132.058775
245990,id1128555,1,2016-03-03 19:27:20,2016-03-03 19:37:06,1,-73.960091,40.807701,-73.980644,40.782688,N,586,3,19,3,3,3.275564,0.045567,-148.10833
1445178,id0506115,2,2016-05-24 19:15:09,2016-05-24 19:22:26,6,-73.970398,40.765167,-73.953178,40.775116,N,437,24,19,5,1,1.823882,0.027168,52.654996


##Convert categorical data

In [14]:
# Define numerical and categorical columns
numerical_cols = ['haversine_distance', 'manhattan_distance', 'direction']
categorical_cols = ['vendor_id', 'passenger_count', 'store_and_fwd_flag', 'day', 'hour', 'month', 'week']

# Encode categorical data into numerical data using one-hot encoding
def encode_categorical(df):
    for categorical in categorical_cols:
        encoded_cat = pd.get_dummies(df[categorical], prefix=categorical, prefix_sep='_')
        df = (df.drop([categorical], axis=1)).join(encoded_cat)
    return df
# Apply encoding to training and test data
train_df = encode_categorical(train_df)

In [15]:
train_df.sample(10)

Unnamed: 0,id,pickup_datetime,dropoff_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,trip_duration,haversine_distance,manhattan_distance,...,month_4,month_5,month_6,week_0,week_1,week_2,week_3,week_4,week_5,week_6
1188139,id2615994,2016-03-13 04:04:18,2016-03-13 04:33:35,-73.994087,40.750977,-73.946404,40.813927,1757,8.069376,0.110634,...,0,0,0,0,0,0,0,0,0,1
587617,id3272638,2016-03-22 16:37:03,2016-03-22 16:49:16,-73.982254,40.780525,-73.946609,40.792316,733,3.274873,0.047436,...,0,0,0,0,1,0,0,0,0,0
672875,id0874920,2016-04-13 14:40:56,2016-04-13 14:47:00,-74.007042,40.727665,-74.00782,40.739372,364,1.303443,0.012486,...,1,0,0,0,0,1,0,0,0,0
754984,id2970947,2016-05-25 07:31:29,2016-05-25 07:55:25,-73.862839,40.768791,-73.790672,40.646614,1436,14.885152,0.194344,...,0,1,0,0,0,1,0,0,0,0
1203345,id1864703,2016-05-04 15:55:17,2016-05-04 16:38:33,-73.97541,40.754807,-73.861893,40.768414,2596,9.67975,0.127125,...,0,1,0,0,0,1,0,0,0,0
223545,id2766434,2016-04-01 15:17:42,2016-04-01 15:28:27,-74.015297,40.716148,-74.007851,40.74699,645,3.486377,0.038288,...,1,0,0,0,0,0,0,1,0,0
67694,id2670251,2016-06-05 18:21:21,2016-06-05 18:26:16,-73.962799,40.758694,-73.959305,40.771687,295,1.474407,0.016487,...,0,0,1,0,0,0,0,0,0,1
412589,id3875377,2016-03-14 11:24:47,2016-03-14 11:35:48,-73.999695,40.733353,-73.983849,40.746727,661,1.998496,0.029221,...,0,0,0,1,0,0,0,0,0,0
26260,id2112914,2016-04-26 11:23:03,2016-04-26 11:27:24,-74.003677,40.726429,-73.995369,40.739391,261,1.602361,0.021271,...,1,0,0,0,1,0,0,0,0,0
962638,id2200808,2016-02-24 19:08:07,2016-02-24 19:21:52,-73.956688,40.771305,-73.974319,40.736801,825,4.114077,0.052135,...,0,0,0,0,0,1,0,0,0,0


In [16]:
train_df.columns

Index(['id', 'pickup_datetime', 'dropoff_datetime', 'pickup_longitude',
       'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'trip_duration', 'haversine_distance', 'manhattan_distance',
       'direction', 'vendor_id_1', 'vendor_id_2', 'passenger_count_0',
       'passenger_count_1', 'passenger_count_2', 'passenger_count_3',
       'passenger_count_4', 'passenger_count_5', 'passenger_count_6',
       'passenger_count_8', 'passenger_count_9', 'store_and_fwd_flag_N',
       'store_and_fwd_flag_Y', 'day_1', 'day_2', 'day_3', 'day_4', 'day_5',
       'day_6', 'day_7', 'day_8', 'day_9', 'day_10', 'day_11', 'day_12',
       'day_13', 'day_14', 'day_15', 'day_16', 'day_17', 'day_18', 'day_19',
       'day_20', 'day_21', 'day_22', 'day_23', 'day_24', 'day_25', 'day_26',
       'day_27', 'day_28', 'day_29', 'day_30', 'day_31', 'hour_0', 'hour_1',
       'hour_2', 'hour_3', 'hour_4', 'hour_5', 'hour_6', 'hour_7', 'hour_8',
       'hour_9', 'hour_10', 'hour_11', 'hour_12', 'h

In [17]:
dftrainNew = train_df.drop(['id', 'pickup_datetime', 'dropoff_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'pickup_datetime'], axis=1)

In [18]:
dftrainNew.columns

Index(['trip_duration', 'haversine_distance', 'manhattan_distance',
       'direction', 'vendor_id_1', 'vendor_id_2', 'passenger_count_0',
       'passenger_count_1', 'passenger_count_2', 'passenger_count_3',
       'passenger_count_4', 'passenger_count_5', 'passenger_count_6',
       'passenger_count_8', 'passenger_count_9', 'store_and_fwd_flag_N',
       'store_and_fwd_flag_Y', 'day_1', 'day_2', 'day_3', 'day_4', 'day_5',
       'day_6', 'day_7', 'day_8', 'day_9', 'day_10', 'day_11', 'day_12',
       'day_13', 'day_14', 'day_15', 'day_16', 'day_17', 'day_18', 'day_19',
       'day_20', 'day_21', 'day_22', 'day_23', 'day_24', 'day_25', 'day_26',
       'day_27', 'day_28', 'day_29', 'day_30', 'day_31', 'hour_0', 'hour_1',
       'hour_2', 'hour_3', 'hour_4', 'hour_5', 'hour_6', 'hour_7', 'hour_8',
       'hour_9', 'hour_10', 'hour_11', 'hour_12', 'hour_13', 'hour_14',
       'hour_15', 'hour_16', 'hour_17', 'hour_18', 'hour_19', 'hour_20',
       'hour_21', 'hour_22', 'hour_23', 'month

In [19]:
dftrainNew.sample(10)

Unnamed: 0,trip_duration,haversine_distance,manhattan_distance,direction,vendor_id_1,vendor_id_2,passenger_count_0,passenger_count_1,passenger_count_2,passenger_count_3,...,month_4,month_5,month_6,week_0,week_1,week_2,week_3,week_4,week_5,week_6
238706,416,2.246501,0.033459,-128.300704,0,1,0,0,1,0,...,0,0,0,1,0,0,0,0,0,0
417361,849,1.887158,0.02705,37.083992,1,0,0,1,0,0,...,1,0,0,0,0,1,0,0,0,0
1056586,1011,4.554801,0.041077,179.878613,1,0,0,1,0,0,...,0,1,0,0,0,0,0,0,1,0
938653,713,1.907945,0.028385,50.098463,1,0,0,1,0,0,...,0,0,0,1,0,0,0,0,0,0
365936,1595,2.689906,0.038807,38.463178,0,1,0,0,1,0,...,0,0,0,0,0,1,0,0,0,0
93248,1061,7.382611,0.089973,17.763954,0,1,0,1,0,0,...,0,0,0,0,0,0,1,0,0,0
1227847,769,1.353701,0.019836,-42.467973,1,0,0,1,0,0,...,0,0,1,0,0,0,1,0,0,0
455604,851,0.981921,0.011925,-17.47876,0,1,0,1,0,0,...,0,0,0,0,0,0,0,1,0,0
437054,876,2.218435,0.032867,-133.070883,1,0,0,1,0,0,...,0,0,0,0,0,0,0,1,0,0
1321602,5264,20.86706,0.310631,127.166502,0,1,0,1,0,0,...,1,0,0,0,0,0,1,0,0,0


##saving final features 

In [20]:
dftrainNew.to_csv('data.csv', index=False)

#Model

##Convert to spark

In [21]:
dfspark = spark.read.option('header','true').csv('data.csv')
from pyspark.sql.functions import col

# cast all columns to float
for col_name in dfspark.columns:
    dfspark = dfspark.withColumn(col_name, col(col_name).cast('float'))


In [22]:
dfspark.show(3)

+-------------+------------------+------------------+----------+-----------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+--------------------+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+------+------+------+------+------+------+
|trip_duration|haversine_distance|manhattan_distance| direction|vendor_id_1|vendor_id_2|passenger_count_0|passenger_count_1|passenger_count_2|passenger_count_3|passenger_count_4|passenger_count_5|passenger_count_6|passen

In [23]:
# devide dataset to training features and target

X_column_names = dfspark.columns
target_colum_name = ['trip_duration']

# remove 'trip_duration' from X_column_names
X_column_names.remove('trip_duration')

print(X_column_names)


['haversine_distance', 'manhattan_distance', 'direction', 'vendor_id_1', 'vendor_id_2', 'passenger_count_0', 'passenger_count_1', 'passenger_count_2', 'passenger_count_3', 'passenger_count_4', 'passenger_count_5', 'passenger_count_6', 'passenger_count_8', 'passenger_count_9', 'store_and_fwd_flag_N', 'store_and_fwd_flag_Y', 'day_1', 'day_2', 'day_3', 'day_4', 'day_5', 'day_6', 'day_7', 'day_8', 'day_9', 'day_10', 'day_11', 'day_12', 'day_13', 'day_14', 'day_15', 'day_16', 'day_17', 'day_18', 'day_19', 'day_20', 'day_21', 'day_22', 'day_23', 'day_24', 'day_25', 'day_26', 'day_27', 'day_28', 'day_29', 'day_30', 'day_31', 'hour_0', 'hour_1', 'hour_2', 'hour_3', 'hour_4', 'hour_5', 'hour_6', 'hour_7', 'hour_8', 'hour_9', 'hour_10', 'hour_11', 'hour_12', 'hour_13', 'hour_14', 'hour_15', 'hour_16', 'hour_17', 'hour_18', 'hour_19', 'hour_20', 'hour_21', 'hour_22', 'hour_23', 'month_1', 'month_2', 'month_3', 'month_4', 'month_5', 'month_6', 'week_0', 'week_1', 'week_2', 'week_3', 'week_4', 'wee

In [24]:
# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler
v_asmblr = VectorAssembler(inputCols=X_column_names, outputCol='Fvec')
df = v_asmblr.transform(dfspark)
X = df.select(['Fvec','trip_duration'])
X.show(3)

+--------------------+-------------+
|                Fvec|trip_duration|
+--------------------+-------------+
|(84,[0,1,2,4,6,14...|        455.0|
|(84,[0,1,2,3,6,14...|        663.0|
|(84,[0,1,2,4,6,14...|       2124.0|
+--------------------+-------------+
only showing top 3 rows



In [25]:
X.tail(3)

[Row(Fvec=SparseVector(84, {0: 7.8246, 1: 0.1067, 2: -150.7885, 4: 1.0, 6: 1.0, 14: 1.0, 37: 1.0, 53: 1.0, 74: 1.0, 81: 1.0}), trip_duration=764.0),
 Row(Fvec=SparseVector(84, {0: 1.0926, 1: 0.0155, 2: 35.0333, 3: 1.0, 6: 1.0, 14: 1.0, 20: 1.0, 62: 1.0, 71: 1.0, 78: 1.0}), trip_duration=373.0),
 Row(Fvec=SparseVector(84, {0: 1.134, 1: 0.0156, 2: 29.9695, 3: 1.0, 6: 1.0, 14: 1.0, 20: 1.0, 61: 1.0, 74: 1.0, 78: 1.0}), trip_duration=198.0)]

In [26]:
# Split the DataFrame into training and testing sets
train_df, test_df = X.randomSplit([0.8,0.2])

In [27]:
train_df.tail(3)

[Row(Fvec=SparseVector(84, {4: 1.0, 11: 1.0, 14: 1.0, 41: 1.0, 66: 1.0, 75: 1.0, 80: 1.0}), trip_duration=435.0),
 Row(Fvec=SparseVector(84, {4: 1.0, 11: 1.0, 14: 1.0, 43: 1.0, 65: 1.0, 75: 1.0, 82: 1.0}), trip_duration=1208.0),
 Row(Fvec=SparseVector(84, {4: 1.0, 11: 1.0, 14: 1.0, 43: 1.0, 68: 1.0, 74: 1.0, 80: 1.0}), trip_duration=992.0)]

##1- Linear regression

In [28]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'Fvec', labelCol='trip_duration', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [65.83326522113298,2904.077775383094,0.08487183876882207,-0.2565178900448294,0.2565178900411483,0.0,-11.51321679118828,15.876930168560515,19.75007242085877,31.851409407957537,-6.4216058435391865,-2.4591820711771444,-615.261711105876,0.0,-38.26093445121344,38.26093445111288,-21.497924570870552,-22.301166972517976,-8.327403003020844,-11.419587488667382,-11.999220251178365,-10.806555827402507,-23.100261254854008,-0.6876782469698036,-5.847645914185283,-12.80572974219198,-5.585967066155194,9.38702594513902,5.5332318104481555,9.881170621046744,-5.98174198639674,7.615720046510708,2.0337504569921014,-0.0,1.2332005024155652,3.1062648413940352,12.919656513449947,9.87480954139377,2.710443322791042,0.0,9.112609536654343,28.73362404704679,2.0300921385200663,-13.633710294514138,-8.248532391305273,-20.025597183456416,0.0,-97.61681147612286,-125.5990776039402,-147.09326535223178,-170.77828631548851,-228.3754080940505,-333.0820952121945,-249.6473752908115,-86.78976485909054,29.92242093164

In [29]:
lr_model.evaluate(test_df).predictions.tail(20)

[Row(Fvec=SparseVector(84, {4: 1.0, 7: 1.0, 14: 1.0, 44: 1.0, 59: 1.0, 72: 1.0, 77: 1.0}), trip_duration=552.0, prediction=494.71855896182535),
 Row(Fvec=SparseVector(84, {4: 1.0, 8: 1.0, 14: 1.0, 31: 1.0, 69: 1.0, 71: 1.0, 82: 1.0}), trip_duration=67.0, prediction=362.83864137552223),
 Row(Fvec=SparseVector(84, {4: 1.0, 8: 1.0, 14: 1.0, 41: 1.0, 48: 1.0, 73: 1.0, 82: 1.0}), trip_duration=527.0, prediction=349.52874566718225),
 Row(Fvec=SparseVector(84, {4: 1.0, 8: 1.0, 14: 1.0, 43: 1.0, 59: 1.0, 76: 1.0, 78: 1.0}), trip_duration=287.0, prediction=643.0424031303575),
 Row(Fvec=SparseVector(84, {4: 1.0, 9: 1.0, 14: 1.0, 32: 1.0, 66: 1.0, 75: 1.0, 78: 1.0}), trip_duration=289.0, prediction=570.3973344198776),
 Row(Fvec=SparseVector(84, {4: 1.0, 10: 1.0, 14: 1.0, 16: 1.0, 59: 1.0, 74: 1.0, 81: 1.0}), trip_duration=244.0, prediction=594.1559183481381),
 Row(Fvec=SparseVector(84, {4: 1.0, 10: 1.0, 14: 1.0, 19: 1.0, 55: 1.0, 73: 1.0, 81: 1.0}), trip_duration=627.0, prediction=517.50969888017

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

# assuming you have already trained a linear regression model called 'lr_model' on your training data

# make predictions on the test data
predictions = lr_model.transform(test_df)

# create an evaluator for regression problems and set the label and prediction columns
evaluator = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='rmse')

# evaluate the model's root mean squared error (RMSE) on the test data
rmse = evaluator.evaluate(predictions)

# print the RMSE
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")


Root Mean Squared Error (RMSE) on test data = 414.42161779688126


In [31]:
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming `predictions` is a DataFrame containing the predictions of your model on the test data
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='trip_duration', metricName='r2')
r2_score = evaluator.evaluate(predictions)

print('R-squared score on test data:', r2_score)


R-squared score on test data: 0.601951312535913


##2- Decision Tree

In [38]:
from pyspark.ml.regression import DecisionTreeRegressor

# create a decision tree regression model and set its parameters
dt_model = DecisionTreeRegressor(featuresCol='Fvec', labelCol='trip_duration', maxDepth=10)

# train the model on your training data
dt_model = dt_model.fit(train_df)

# make predictions on the test data
predictions = dt_model.transform(test_df)

# evaluate the model's root mean squared error (RMSE) on the test data
evaluator = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

# print the RMSE
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")


Root Mean Squared Error (RMSE) on test data = 375.41748614712577


In [39]:
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming `predictions` is a DataFrame containing the predictions of your model on the test data
evaluator_r2 = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='r2')
r2 = evaluator_r2.evaluate(predictions)

print('R-squared score on test data:', r2_score)


R-squared score on test data: 0.601951312535913


##3- Random Forest

In [None]:
from pyspark.ml.regression import RandomForestRegressor

# create a random forest regression model and set its parameters
rf_model = RandomForestRegressor(featuresCol='Fvec', labelCol='trip_duration', numTrees=100, maxDepth=10)

# train the model on your training data
rf_model = rf_model.fit(train_df)

# make predictions on the test data
predictions = rf_model.transform(test_df)

# evaluate the model's root mean squared error (RMSE) on the test data
evaluator = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

# print the RMSE
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming `predictions` is a DataFrame containing the predictions of your model on the test data
evaluator_r2 = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='r2')
r2 = evaluator_r2.evaluate(predictions)

print('R-squared score on test data:', r2_score)


##4- XGBoost 

In [34]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create an XGBoost regression model
xgb_model = GBTRegressor(featuresCol='Fvec', labelCol='trip_duration', maxDepth=10)

# Train the model on your training data
xgb_model = xgb_model.fit(train_df)

# Make predictions on the test data
predictions = xgb_model.transform(test_df)

# Evaluate the model's root mean squared error (RMSE) on the test data
evaluator = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

# Print the RMSE
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

# Evaluate the model's R-squared score on the test data
evaluator_r2 = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='r2')
r2 = evaluator_r2.evaluate(predictions)

print('R-squared score on test data:', r2)


Root Mean Squared Error (RMSE) on test data = 337.25024577350376
R-squared score on test data: 0.7363936047199996


In [37]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create an XGBoost regression model
xgb_model = GBTRegressor(featuresCol='Fvec', labelCol='trip_duration')

# Train the model on your training data
xgb_model = xgb_model.fit(train_df)

# Make predictions on the test data
predictions = xgb_model.transform(test_df)

# Evaluate the model's root mean squared error (RMSE) on the test data
evaluator = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

# Print the RMSE
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

# Evaluate the model's R-squared score on the test data
evaluator_r2 = RegressionEvaluator(labelCol='trip_duration', predictionCol='prediction', metricName='r2')
r2 = evaluator_r2.evaluate(predictions)

print('R-squared score on test data:', r2)


Root Mean Squared Error (RMSE) on test data = 306.57279154861004
R-squared score on test data: 0.78699982650488
