In [None]:
dbutils.fs.unmount("/mnt/blobstorage")

spark.conf.set(
    "fs.azure.account.key.volstore.blob.core.windows.net",
    "dn7fGFQ+q2yNtue6G3i+6uGPAwPPAZDpee7lXrRn6vkacMMrjum9D3tlN5gGdXMpBm3PZuhVPYYi+AStZ/5ZRg=="
)

# Define the storage account details
storage_account_name = "volstore"
container_name = "datastore"
storage_account_key = "dn7fGFQ+q2yNtue6G3i+6uGPAwPPAZDpee7lXrRn6vkacMMrjum9D3tlN5gGdXMpBm3PZuhVPYYi+AStZ/5ZRg=="

# Mount the Blob Storage container
dbutils.fs.mount(
    source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
    mount_point = "/mnt/blobstorage",
    extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

df = spark.read.option("header", "true").csv("/mnt/blobstorage/Dataset/2014/nyc_taxi_data_2014/nyc_taxi_data_2014.csv")


In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, sum as spark_sum
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import pipeline
import seaborn as sns
from sklearn.model_selection import train_test_split
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error
from lightgbm import LGBMRegressor as lgb

In [None]:
# Display data in the dataset
df.show(5)

# Check the number of rows
num_rows = df.count()

# Check the number of columns
num_columns = len(df.columns)

# Display the shape
print(f"Shape of the DataFrame: ({num_rows}, {num_columns})")

# Identify and drop duplicate rows
df_no_duplicates = df.dropDuplicates()

# Count the number of null values for each column
null_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

#Only keep trips that lasted less than 5900 seconds
df = df[(df.trip_distance < 5900)]

#Only keep trips with passengers
df = df[(df.passenger_count > 0)]

In [None]:
#Deal with categorical features 

# Indexing the categorical column
indexer = StringIndexer(inputCol='store_and_fwd_flag', outputCol='store_and_fwd_flag_indexed')

# One-hot encoding the indexed column
encoder = OneHotEncoder(inputCol='store_and_fwd_flag_indexed', outputCol='store_and_fwd_flag_encoded')

# Creating a pipeline to execute both operations
pipeline = Pipeline(stages=[indexer, encoder])

# Applying the pipeline to the DataFrame
df_transformed = pipeline.fit(df).transform(df)

# Dropping the original categorical column and the indexed one
df_transformed = df_transformed.drop('store_and_fwd_flag', 'store_and_fwd_flag_indexed')

# One-hot encoding binary categorical features
indexer = StringIndexer(inputCol='vendor_id', outputCol='vendor_id_indexed')
encoder = OneHotEncoder(inputCol='vendor_id_indexed', outputCol='vendor_id_encoded')

# Creating a pipeline to execute both operations
pipeline = Pipeline(stages=[indexer, encoder])

# Applying the pipeline to the DataFrame
df_transformed = pipeline.fit(df).transform(df)

# Dropping the original categorical column and the indexed one
df_transformed = df_transformed.drop('vendor_id', 'vendor_id_indexed')

#Datetyping the dates
df['pickup_datetime'] = pd.to_datetime(df.pickup_datetime)

df.drop(['dropoff_datetime'], axis=1, inplace=True) #as we don't have this feature in the testset

#Date features creations and deletions
df['month'] = df.pickup_datetime.dt.month
df['week'] = df.pickup_datetime.dt.week
df['weekday'] = df.pickup_datetime.dt.weekday
df['hour'] = df.pickup_datetime.dt.hour
df['minute'] = df.pickup_datetime.dt.minute
df['minute_oftheday'] = df['hour'] * 60 + df['minute']
df.drop(['minute'], axis=1, inplace=True)

df.drop(['pickup_datetime'], axis=1, inplace=True)

df.info()


#Function aiming at calculating distances from coordinates
def ft_haversine_distance(lat1, lng1, lat2, lng2):
    lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
    AVG_EARTH_RADIUS = 6371 #km
    lat = lat2 - lat1
    lng = lng2 - lng1
    d = np.sin(lat * 0.5) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(lng * 0.5) ** 2
    h = 2 * AVG_EARTH_RADIUS * np.arcsin(np.sqrt(d))
    return h

#Add distance feature
df['distance'] = ft_haversine_distance(df['pickup_latitude'].values,
                                                 df['pickup_longitude'].values, 
                                                 df['dropoff_latitude'].values,
                                                 df['dropoff_longitude'].values)

#Function aiming at calculating the direction
def ft_degree(lat1, lng1, lat2, lng2):
    AVG_EARTH_RADIUS = 6371 #km
    lng_delta_rad = np.radians(lng2 - lng1)
    lat1, lng1, lat2, lng2 = map(np.radians, (lat1, lng1, lat2, lng2))
    y = np.sin(lng_delta_rad) * np.cos(lat2)
    x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(lng_delta_rad)
    return np.degrees(np.arctan2(y, x))

#Add direction feature
df['direction'] = ft_degree(df['pickup_latitude'].values,
                                df['pickup_longitude'].values,
                                df['dropoff_latitude'].values,
                                df['dropoff_longitude'].values)


#Remove distance outliers
df = df[(df.distance < 200)]

#Create speed feature
df['speed'] = df.distance / df.trip_duration

#Remove speed outliers
df = df[(df.speed < 30)]
df.drop(['speed'], axis=1, inplace=True)

In [None]:
# Detecting outlier trips (outside of NYC) and remove them

# lat and long number comes from & credit to DrGuillermo: Animation
xlim = [-74.03, -73.77]
ylim = [40.63, 40.85]
train = df[(df.pickup_longitude> xlim[0]) & (df.pickup_longitude < xlim[1])]
train = df[(train.dropoff_longitude> xlim[0]) & (df.dropoff_longitude < xlim[1])]
train = df[(train.pickup_latitude> ylim[0]) & (df.pickup_latitude < ylim[1])]
train = df[(train.dropoff_latitude> ylim[0]) & (df.dropoff_latitude < ylim[1])]
plt.plot(df['pickup_longitude'], train['pickup_latitude'], '.', color='k', alpha=0.8)
plt.title('Pickup Location Lat and Long', weight = 'bold')


# only select useful columns
subset_train = train[['trip_distance', 'fare_amount','vendor_id','mta_tax','passenger_count','hour']]
log_duration = np.log1p(subset_train['trip_duration'])
subset_train.sample(False, 0.1, seed=42).show(5)

# Avg Trip Durations by Weekday
weekday_list = ['Mon','Tues','Wed','Thurs','Fri','Sat','Sun']
g = sns.factorplot(kind='bar',        # Boxplot
               y='trip_duration',       # Y-axis - values for boxplot
               x='weekday',        # X-axis - first factor
               #estimator = np.sum, 
               data=subset_train,        # Dataframe 
               size=6,            # Figure size (x100px)      
               aspect=1.6,        # Width = size * aspect 
               order = list(weekday_list),
               legend_out=False) 
plt.title('Avg Trip Durations by Weekday\n', weight = 'bold', size = 20)
plt.xlabel('Weekday', size = 18,weight = 'bold')
plt.ylabel('Average trip duration', size = 18,weight = 'bold')
g.set_xticklabels(rotation=45)

# Total Distance (in miles) by Weekday
weekday_list = ['Mon','Tues','Wed','Thurs','Fri','Sat','Sun']
g = sns.factorplot(kind='bar',        # Boxplot
               y='haversine_distance',       # Y-axis - values for boxplot
               x='weekday',        # X-axis - first factor
               estimator = np.sum, 
               data=subset_train,        # Dataframe 
               size=6,            # Figure size (x100px)      
               aspect=1.6,        # Width = size * aspect 
               order = list(weekday_list),
               legend_out=False) 
plt.title('Total Distance (in miles) by Weekday\n', weight = 'bold', size = 20)
plt.xlabel('Weekday', size = 18,weight = 'bold')
plt.ylabel('Total Distance (in miles) ', size = 18,weight = 'bold')
g.set_xticklabels(rotation=45)

#Average Duration by Hour of Day and Day of Week
sns.set(font_scale=1.3)
g = sns.factorplot('pickup_hour', 
                   'trip_duration', 
                   hue = 'weekday', 
                   estimator = np.mean, 
                   data = subset_train, 
                   size = 8, 
                   aspect = 2, 
                    ci=None,
                   legend_out=False)
sns.plt.title('Average Duration by Hour of Day and Day of Week \n',weight='bold', size = 20)
plt.xlabel('start hour', size = 18,weight = 'bold')
plt.ylabel('avg duration', size = 18,weight = 'bold')

In [None]:
df['hour']=subset_train.pickup_hour.astype(str)
df = df[['log_duration','log_haversine_distance', 'weekday','month','hour']]

X = df.drop("log_duration",axis=1)
y = df["log_duration"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Train the models

# GradientBoosting
gb = GradientBoostingRegressor()
gb.fit(X_train, y_train)
#print(gb.score(X_train, y_train), gb.score(X_test, y_test))
print("classic gradient boosting")
print(np.sqrt(mean_squared_error(y_test, np.clip(gb.predict(X_test),0,None))))

# RandomForest
rfm = RandomForestRegressor(bootstrap=True,max_depth=90,max_features='auto',min_samples_split=15,min_samples_leaf=10,n_estimators=30)
rfm.fit(X_train, y_train)
print("random forest")
print(rfm.score(X_train, y_train), rfm.score(X_test, y_test),rfm.score(y_test, log_duration))