In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor, GeneralizedLinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.sql.types import *
import pyspark.sql.functions as F

### I. Creating the dataset

In [2]:
spark = SparkSession.builder.appName("ML-Example").getOrCreate()
df = spark.read.csv("/home/jovyan/data/rides/2017Q3-capitalbikeshare-tripdata.csv", header=True)
df = df.select(['Duration', 'Start date', 'Start station number', 'Member type'])
df = df.withColumn('Start station number', df['Start station number'].cast(IntegerType()))
print(f'There are {df.count()} rows in the dataset')
df.show(5)

There are 1191585 rows in the dataset
+--------+-------------------+--------------------+-----------+
|Duration|         Start date|Start station number|Member type|
+--------+-------------------+--------------------+-----------+
|    2762|2017-07-01 00:01:09|               31289|     Casual|
|    2763|2017-07-01 00:01:24|               31289|     Casual|
|     690|2017-07-01 00:01:45|               31122|     Member|
|     134|2017-07-01 00:01:46|               31201|     Member|
|     587|2017-07-01 00:02:05|               31099|     Casual|
+--------+-------------------+--------------------+-----------+
only showing top 5 rows



In [3]:
stations = spark.read.csv("/home/jovyan/data/stations/*", header=True)
print(f'There are {stations.count()} rows in the stations')
stations = stations.withColumnRenamed('LATITUDE', 'start_station_lat')
stations = stations.withColumnRenamed('LONGITUDE', 'start_station_long')
stations = stations.withColumn('Start station number', stations['TERMINAL_NUMBER'].cast(IntegerType()))
stations = stations.select(['start_station_lat', 'start_station_long', 'Start station number'])
stations.show(5)

There are 571 rows in the stations
+-----------------+------------------+--------------------+
|start_station_lat|start_station_long|Start station number|
+-----------------+------------------+--------------------+
|        39.083673|        -77.149162|               32017|
|        39.123513|         -77.15741|               32018|
|        38.990249|         -77.02935|               32019|
|        39.107709|        -77.152072|               32020|
|        38.982456|        -77.091991|               32021|
+-----------------+------------------+--------------------+
only showing top 5 rows



In [4]:
# remove rides longer than 1.5 hours
one_and_a_half_hours = 60 * 60 * 1.5
df = df.filter(df['Duration'] <= one_and_a_half_hours)
# remove rides shorter than 3 minutes
three_minutes = 60 * 3
df = df.filter(df['Duration'] >= three_minutes)
# remove unknown 'Member type's
df = df.filter(~(df['Member type'] == 'Unknown'))

# remove non-existent stations
df = df.filter(~(df['Start station number'] == 31008) & ~(df['Start station number'] == 32051) & ~(df['Start station number'] == 32034))

# make label/target feature
df = df.withColumn('label', F.log1p(df.Duration))

In [5]:
df = df.join(stations, on='Start station number', how='left')
df = df.withColumn('start_station_long', df['start_station_long'].cast(DoubleType()))
df = df.withColumn('start_station_lat', df['start_station_lat'].cast(DoubleType()))
print(f'Complete dataset has {df.count()} rows')
df.show(5)

Complete dataset has 1116875 rows
+--------------------+--------+-------------------+-----------+-----------------+-----------------+------------------+
|Start station number|Duration|         Start date|Member type|            label|start_station_lat|start_station_long|
+--------------------+--------+-------------------+-----------+-----------------+-----------------+------------------+
|               31289|    2762|2017-07-01 00:01:09|     Casual|7.924072324923417|        38.890544|        -77.049379|
|               31289|    2763|2017-07-01 00:01:24|     Casual| 7.92443418488756|        38.890544|        -77.049379|
|               31122|     690|2017-07-01 00:01:45|     Member| 6.53813982376767|        38.928893|         -77.03625|
|               31099|     587|2017-07-01 00:02:05|     Casual|6.376726947898627|        38.813485|        -77.049468|
|               31099|     586|2017-07-01 00:02:06|     Casual|6.375024819828097|        38.813485|        -77.049468|
+-------------

### II. EDA 
Exploratory Data Analysis is covered in the `bike-share-eda.ipynb` notebook

---

### III. Prediction Pipeline with PySpark

In [6]:
df = df.withColumn("Start date", F.to_timestamp('Start date', 'yyyy-MM-dd HH:mm:ss'))
df = df.withColumn("day_of_week", F.dayofweek("Start date"))
df = df.withColumn("week_of_year", F.weekofyear("Start date"))
df = df.withColumn("month", F.month("Start date"))
df = df.withColumn("minute", F.minute("Start date"))
df = df.withColumn("hour", F.hour("Start date"))

pi = 3.141592653589793

df = df.withColumn('sin_day_of_week', F.sin(2 * pi * df['day_of_week'] / 7))
df = df.withColumn('sin_week_of_year', F.sin(2 * pi * df['week_of_year'] / 53))
df = df.withColumn('sin_month', F.sin(2 * pi * (df['month'] - 1) / 12))
df = df.withColumn('sin_minute', F.sin(2 * pi * df['minute'] / 60))
df = df.withColumn('sin_hour', F.sin(2 * pi * df['hour'] / 24))

df = df.withColumn('cos_day_of_week', F.cos(2 * pi * df['day_of_week'] / 7))
df = df.withColumn('cos_week_of_year', F.cos(2 * pi * df['week_of_year'] / 53))
df = df.withColumn('cos_month', F.cos(2 * pi * (df['month'] - 1) / 12))
df = df.withColumn('cos_minute', F.cos(2 * pi * df['minute'] / 60))
df = df.withColumn('cos_hour', F.cos(2 * pi * df['hour'] / 24))

df = df.drop("Start date", "Start station number", "Duration", "month", "hour", "minute", "day_of_week", "week_of_year")

In [7]:
df.show(3)

+-----------+-----------------+-----------------+------------------+--------------------+-------------------+--------------------+-------------------+--------+---------------+-------------------+---------+------------------+--------+
|Member type|            label|start_station_lat|start_station_long|     sin_day_of_week|   sin_week_of_year|           sin_month|         sin_minute|sin_hour|cos_day_of_week|   cos_week_of_year|cos_month|        cos_minute|cos_hour|
+-----------+-----------------+-----------------+------------------+--------------------+-------------------+--------------------+-------------------+--------+---------------+-------------------+---------+------------------+--------+
|     Casual|7.924072324923417|        38.890544|        -77.049379|-2.44929359829470...|0.05924062789371414|1.224646799147353...|0.10452846326765346|     0.0|            1.0|-0.9982437317643215|     -1.0|0.9945218953682733|     1.0|
|     Casual| 7.92443418488756|        38.890544|        -77.049

In [8]:
# encode the categorical feature 'Member type'
rider_indexer = StringIndexer(inputCol='Member type', outputCol='rider_idx')
rider_encoder = OneHotEncoder(inputCol='rider_idx', outputCol='rider_enc')

In [9]:
# create a VectorAssembler for all features
vector = VectorAssembler(
    inputCols=[
        'start_station_lat',
        'start_station_long',
        'rider_enc',
        'sin_day_of_week',
        'cos_day_of_week',
        'sin_week_of_year',
        'cos_week_of_year',
        'sin_month',
        'cos_month',
        'sin_minute',
        'cos_minute',
        'sin_hour',
        'cos_hour',
    ],
    outputCol='features'
)

# StandardScaler will scale all features
scaler = StandardScaler(
    inputCol='features', 
    outputCol='scaled_features'
)

In [10]:
# rf = RandomForestRegressor(featuresCol='scaled_features')
glr = GeneralizedLinearRegression(
    featuresCol='scaled_features'
)

pipeline = Pipeline(
    stages=[
        rider_indexer, 
        rider_encoder, 
        vector, 
        scaler, 
        glr
    ]
)

evaluation = RegressionEvaluator()
grid = ParamGridBuilder()
grid = grid.addGrid(glr.maxIter, [25, 35])
grid = grid.addGrid(glr.family, ["gamma", "poisson"])
grid = grid.addGrid(glr.regParam, [0.0, 0.5])
grid = grid.build()

In [11]:
cv = CrossValidator(
    estimator=pipeline, 
    estimatorParamMaps=grid, 
    evaluator=evaluation,
    numFolds=7
)

In [None]:
train, test = df.randomSplit([.7, .3])
models = cv.fit(train)
best = models.bestModel

In [None]:
models.avgMetrics

In [None]:
results = models.transform(test)

In [None]:
evaluation.evaluate(results)

In [None]:
def extract_best_params(params: dict) -> dict:
    best_params_as_dict = {}
    for param, value in params.items():
        param_name = param.name
        best_params_as_dict[param_name] = value
    return best_params_as_dict

In [None]:
best_algo = best.stages[-1]
extract_best_params(best_algo.extractParamMap())