In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import isnan, split, element_at, when, col, count, isnull
from pyspark.ml.feature import Imputer
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoder
from pyspark.sql.types import *

In [2]:
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

### Preprocessing

In [None]:
import time
start_time = time.time()

mtrip = ss.sql("select * from parquet.`s3a://usf.msds.stillbikeshare/merged`")
mtrip.cache()
weather = ss.read.csv("s3a://usf.msds.stillbikeshare/weather.csv", inferSchema=True, header=True)
weather.cache()

columns = weather.schema.names
for x in ['date', 'precipitation_inches', 'events', 'zip_code']:
    columns.remove(x) 

columns

for col in columns:
    mtrip_df = mtrip.withColumn(col, mtrip[col].cast(FloatType()))
mtrip

# use median strategy to impute numerical weather columns
imputed_columns = ["out_"+col for col in columns]
imputer = Imputer(inputCols=columns, outputCols=imputed_columns).setStrategy("median")
mtrip_weather_imputed_part_one = imputer.fit(mtrip)
mtrip_imp_weather_part_one = mtrip_weather_imputed_part_one.transform(mtrip)
mtrip_imp_weather_part_one.schema.names

# check whether out_max_gust_speed_mph has been successfully imputed
mtrip_imp_weather_part_one.select('out_max_gust_speed_mph').show()
mtrip_imp_weather_part_one.select([count(when(isnan('out_max_gust_speed_mph'), 'out_max_gust_speed_mph')).alias('count_max_gust_speed_mph')]).show()

# determine the most frequent value in precipitation inches
mtrip_imp_weather_part_one.groupBy('precipitation_inches').count().orderBy('count', ascending=False).show()

# use mode strategy to impute precipitation_inches
mtrip_weather_imputed_fin = mtrip_imp_weather_part_one.fillna({'precipitation_inches':'0'})
mtrip_imp_weather_part_one.select([count(when(isnan('precipitation_inches'), 'precipitation_inches')).alias('count_precipitation_inches')]).show()

for col in columns:
    mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.drop(col)
type(mtrip_weather_imputed_fin.schema['start_city'].dataType)

mtrip_weather_imputed_fin

# select all the names of the string type columns
string_names = []
for name in mtrip_weather_imputed_fin.schema.names:
    if mtrip_weather_imputed_fin.schema[name].dataType == StringType():
        string_names.append(name)
string_names

string_names_without_date = ['start_station_name', 'end_station_name', 'subscription_type', 'precipitation_inches', 'events', 'start_name', 'start_city', 'end_name', 'end_city']
indexed_string_names_without_date = ['ind_' + name for name in string_names_without_date]

for i in range(len(string_names_without_date)):
    indexed_train_fit = StringIndexer(inputCol=string_names_without_date[i], \
                                      outputCol=indexed_string_names_without_date[i]).setHandleInvalid("keep")\
                        .fit(mtrip_weather_imputed_fin)
    mtrip_weather_imputed_fin = indexed_train_fit.transform(mtrip_weather_imputed_fin)\
                        .drop(string_names_without_date[i]).withColumnRenamed(indexed_string_names_without_date[i], string_names_without_date[i])

# successfully string indexed
mtrip_weather_imputed_fin.select('subscription_type', 'precipitation_inches').show(10)

set(string_names).difference(set(string_names_without_date))

# encode start_date and end_date
from pyspark.sql.functions import udf,desc
from datetime import datetime

# weekday
start_weekDay =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M').strftime('%w'))
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn('start_weekday', start_weekDay(mtrip_weather_imputed_fin['start_date'])).sort(desc("start_date"))
end_weekDay =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M').strftime('%w'))
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn('end_weekday', end_weekDay(mtrip_weather_imputed_fin['end_date'])).sort(desc("end_date"))

# month
start_month =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M').strftime('%m'))
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn('start_month', start_month(mtrip_weather_imputed_fin['start_date'])).sort(desc("start_date"))
end_month =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M').strftime('%m'))
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn('end_month', end_month(mtrip_weather_imputed_fin['end_date'])).sort(desc("end_date"))

# hour
start_hour =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M').strftime('%H'))
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn('start_hour', start_hour(mtrip_weather_imputed_fin['start_date'])).sort(desc("start_date"))
end_hour =  udf(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M').strftime('%H'))
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn('end_hour', end_hour(mtrip_weather_imputed_fin['end_date'])).sort(desc("end_date"))

mtrip_weather_imputed_fin.select('end_hour').show()

# change all str date related columns to integer type
for name in ["start_weekday", "end_weekday", "start_month", "end_month", "start_hour", "end_hour"]:
    mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.withColumn(name, mtrip_weather_imputed_fin[name].cast(IntegerType()))

mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.drop('end_installation_date', 'start_date', 'start_installation_date', 'date', 'start_date_no_time', 'end_date')

mtrip_weather_imputed_fin

# one hot encode weekday month hour

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

df_mtrip_spark = oneHotEncodeColumns(mtrip_weather_imputed_fin, ["start_weekday", "end_weekday", "start_month", "end_month", "start_hour", "end_hour"])                          

df_mtrip_spark.show()

In [None]:
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
df_mtrip_spark.write.parquet("s3a://usf.msds.stillbikeshare/clean_data_bike_share")

df_mtrip_read = ss.sql("select * from parquet.`s3a://usf.msds.stillbikeshare/clean_data_bike_share`")
df_mtrip_read.show()

sc.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") 

import pandas as pd

df_mtrip = mtrip_weather_imputed_fin.toPandas()

df_mtrip = pd.concat([df_mtrip, pd.get_dummies(df_mtrip['start_weekday'], prefix='start_weekday')], axis=1)
df_mtrip = pd.concat([df_mtrip, pd.get_dummies(df_mtrip['end_weekday'], prefix='end_weekday')], axis=1)

df_mtrip

# one hot encode weekday
mtrip_weather_imputed_fin.show()

model = encoder.fit(mtrip_weather_imputed_fin)

encoded = model.transform(mtrip_weather_imputed_fin)

encoded

# I don't think the date in the weather df making as much sense since our merged dataset based on zipcode
# similarly the installation dates not making as much sense either
# thus start_date and end_State are more reasonable predictors directly related to our trip
mtrip_weather_imputed_fin = mtrip_weather_imputed_fin.drop('start_installation_date', 'end_installation_date', 'date')

mtrip_weather_imputed_fin.select('end_hour').distinct().show()

mtrip_weather_imputed_fin.select('weekdays')

mtrip_weather_imputed_fin.write.save("s3a://usf.msds.stillbikeshare/cleaned_bike_share.csv", format='csv', header=True)

mtrip_weather_imputed_fin.toPandas()