<a href="http://www.calstatela.edu/centers/hipic"><img align="left" src="https://avatars2.githubusercontent.com/u/4156894?v=3&s=100"><image/>
</a>
<img align="right" alt="California State University, Los Angeles" src="http://www.calstatela.edu/sites/default/files/groups/California%20State%20University%2C%20Los%20Angeles/master_logo_full_color_horizontal_centered.svg" style="width: 360px;"/>

# CIS5560 Term Project Tutorial

------
#### Authors: [Anupam Sahay](https://www.linkedin.com/in/anupam-sahay-9514a349/); [Krithy Nanaiah Atrangada];[Neha Shashidhara Guli]

#### Instructor: [Jongwook Woo](https://www.linkedin.com/in/jongwook-woo-7081a85)

#### Date: 05/16/2018

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('problem').getOrCreate()

In [4]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [5]:
%fs ls /FileStore/tables/weather.csv

In [6]:
%fs ls /FileStore/tables/status.csv

In [7]:
%fs ls /FileStore/tables/trip.csv

In [8]:
%fs ls /FileStore/tables/station.csv

In [9]:
status_data = spark.read.csv("dbfs:/FileStore/tables/status.csv", inferSchema=True, header=True)
display(status_data)

In [10]:
weather_data = spark.read.csv("dbfs:/FileStore/tables/weather.csv", inferSchema=True, header=True)
display(weather_data)

In [11]:
trip_data = spark.read.csv("dbfs:/FileStore/tables/trip.csv", inferSchema=True, header=True)
display(trip_data)

In [12]:
station_data = spark.read.csv("dbfs:/FileStore/tables/station.csv", inferSchema=True, header=True)
display(station_data)

In [13]:
trip_data.show()

In [14]:
from pyspark.sql.functions import isnan, count, when
status_data.select([count(when(isnan(c), c)).alias(c) for c in status_data.columns]).show()

In [15]:
from pyspark.sql.functions import isnan, count, when
trip_data.select([count(when(isnan(c), c)).alias(c) for c in trip_data.columns]).show()

In [16]:
from pyspark.sql.functions import mean, min, max, stddev
trip_data.select([mean('duration'), min('duration'), max('duration'), stddev('duration')]).show()

In [17]:
#creating a new column 
trip_data = trip_data.withColumn('duration_new', trip_data.duration / 60)

In [18]:
#dropping old column..
trip_data = trip_data.drop(trip_data.duration)

In [19]:
trip_data.show()

In [20]:
trip_data.select([mean('duration_new'), min('duration_new'), max('duration_new'), stddev('duration_new')]).show()

In [21]:
trip_data.count()

In [22]:
trip_data = trip_data.filter(trip_data['duration_new'] <= 360)

In [23]:
trip_data.count()

In [24]:
trip_data.show()

In [25]:
trip_data = trip_data.toPandas()

In [26]:
trip_data.shape

In [27]:
#Convert to datetime so that it can be manipulated more easily
import pandas as pd
trip_data.start_date = pd.to_datetime(trip_data.start_date, format='%m/%d/%Y %H:%M')
trip_data['date'] = trip_data.start_date.dt.date
dates = {}
for d in trip_data.date:
    if d not in dates:
        dates[d] = 1
    else:
        dates[d] += 1
train = pd.DataFrame(dates.items(), columns=['date', 'trips'])

In [28]:
train = sqlContext.createDataFrame(train)

In [29]:
train.show()

In [30]:
train = train.sort('date',ascending = True)

In [31]:
train.show()

In [32]:
weather_data.show()

In [33]:
weather_data.select([count(when(isnan(c), c)).alias(c) for c in weather_data.columns]).show()

In [34]:
weather_data = weather_data.toPandas()
weather_data.date = pd.to_datetime(weather_data.date, format='%m/%d/%Y')
weather_data = sqlContext.createDataFrame(weather_data)

In [35]:
print(train.count())
print(weather_data.count())

In [36]:
weather_data.select('zip_code').distinct().show()

In [37]:
weather_data = weather_data.filter(weather_data['zip_code'] == 94107)

In [38]:
weather_data.select('events').distinct().show()

In [39]:
weather_data = weather_data.na.fill('Normal')
weather_data = weather_data.na.replace('rain','Rain')

In [40]:
weather_data.select('events').distinct().show()

In [41]:
weather_data = weather_data.toPandas()
events = pd.get_dummies(weather_data.events)
weather_data = weather_data.merge(events, left_index = True, right_index = True)
#Remove features we don't need
weather_data = weather_data.drop(['events','zip_code'],1)

In [42]:
weather_data = sqlContext.createDataFrame(weather_data)

In [43]:
weather_data = weather_data.na.drop()

In [44]:
weather_data.select('precipitation_inches').show(700)

In [45]:
weather_data = weather_data.toPandas()
weather_data.precipitation_inches = pd.to_numeric(weather_data.precipitation_inches, errors = 'coerce')
weather_data.loc[weather_data.precipitation_inches.isnull(), 
            'precipitation_inches'] = weather_data[weather_data.precipitation_inches.notnull()].precipitation_inches.median()
weather_data.date = pd.to_datetime(weather_data.date, format='%m/%d/%Y %H:%M')
weather_data['date'] = weather_data.date.dt.date
weather_data = sqlContext.createDataFrame(weather_data)

In [46]:
train1 = train.join(weather_data, on=['date'], how='left_outer')

In [47]:
train1.show()

In [48]:
station_data.show()

In [49]:
#Good, each stations is only listed once
print(station_data.distinct().count())
print(station_data.count())

In [50]:
station_data = station_data.toPandas()
train1 = train1.toPandas()
station_data.installation_date = pd.to_datetime(station_data.installation_date, format = "%m/%d/%Y").dt.date
total_docks = []
for day in train1.date:
    total_docks.append(sum(station_data[station_data.installation_date <= day].dock_count))
train1['total_docks'] = total_docks
station_data = sqlContext.createDataFrame(station_data)
train1 = sqlContext.createDataFrame(train1)

In [51]:
train1.show()

In [52]:
#Find all of the holidays during our time span
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay
train = train1.toPandas()
calendar = USFederalHolidayCalendar()
holidays = calendar.holidays(start=train.date.min(), end=train.date.max())

us_bd = CustomBusinessDay(calendar=USFederalHolidayCalendar())
business_days = pd.DatetimeIndex(start=train.date.min(), end=train.date.max(), freq=us_bd)

business_days = pd.to_datetime(business_days, format='%Y/%m/%d').date
holidays = pd.to_datetime(holidays, format='%Y/%m/%d').date

#A 'business_day' or 'holiday' is a date within either of the respected lists.
train['business_day'] = train.date.isin(business_days)
train['holiday'] = train.date.isin(holidays)

#Convert True to 1 and False to 0
train.business_day = train.business_day.map(lambda x: 1 if x == True else 0)
train.holiday = train.holiday.map(lambda x: 1 if x == True else 0)
train['year'] = pd.to_datetime(train['date']).dt.year
train['month'] = pd.to_datetime(train['date']).dt.month
train['weekday'] = pd.to_datetime(train['date']).dt.weekday
labels = train.trips
train = train.drop(['date'], 1)
train = sqlContext.createDataFrame(train)

In [53]:
train.show()

In [54]:
train = train.withColumnRenamed('trips','target')
train = train.na.drop()

In [55]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[
 'max_temperature_f',
 'mean_temperature_f',
 'min_temperature_f',
 'max_dew_point_f',
 'mean_dew_point_f',
 'min_dew_point_f',
 'max_humidity',
 'mean_humidity',
 'min_humidity',
 'max_sea_level_pressure_inches',
 'mean_sea_level_pressure_inches',
 'min_sea_level_pressure_inches',
 'max_visibility_miles',
 'mean_visibility_miles',
 'min_visibility_miles',
 'max_wind_Speed_mph',
 'mean_wind_speed_mph',
 'max_gust_speed_mph',
 'precipitation_inches',
 'cloud_cover',
 'wind_dir_degrees',
 'Fog',
 'Fog-Rain',
 'Normal',
 'Rain',
 'Rain-Thunderstorm',
 'total_docks',
 'business_day',
 'holiday',
 'year',
 'month',
 'weekday'],
 outputCol='features')

In [56]:
output = assembler.transform(train)

In [57]:
final_data = output.select(['features','target'])
final_data.show()

In [58]:
#split data
train,test = final_data.randomSplit([0.8,0.2])

In [59]:
from pyspark.ml.regression import (DecisionTreeRegressor)
dTree = DecisionTreeRegressor(labelCol='target', featuresCol='features')

In [60]:
dtc_model = dTree.fit(train)
dtc_pred = dtc_model.transform(test)

In [61]:
from pyspark.ml.evaluation import RegressionEvaluator
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="r2")
accuracy = evaluator.evaluate(dtc_pred)
print("Accuracy on test data = %g" % (accuracy * 100))

evaluator1 = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = evaluator1.evaluate(dtc_pred)
print("RMSE = %g" % (rmse))

In [62]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6]).build()
crossval = CrossValidator(estimator=dTree,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # use 3+ folds in practice

crossval_rmse = CrossValidator(estimator=dTree,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator1,
                          numFolds=3)
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(final_data)
cvModel_rmse = crossval_rmse.fit(final_data)

In [63]:
cvPredictions = cvModel.transform(final_data) 
accuracy = evaluator.evaluate(cvPredictions) 
print("Model Accuracy with Cross Validation: ", accuracy*100)
cvPredictions1 = cvModel_rmse.transform(final_data) 
rmse = evaluator1.evaluate(cvPredictions1) 
print("RMSE: ", rmse)


In [64]:
from pyspark.ml.regression import (RandomForestRegressor)
rfr = RandomForestRegressor(labelCol='target', featuresCol='features')

In [65]:
rfr_model = rfr.fit(train)
rfr_pred = rfr_model.transform(test)
evaluator = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="r2")
accuracy = evaluator.evaluate(rfr_pred)
print("Accuracy on test data = %g" % (accuracy * 100))

evaluator1 = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = evaluator1.evaluate(rfr_pred)
print("RMSE = %g" % (rmse))

In [66]:
paramGrid = ParamGridBuilder().addGrid(rfr.maxDepth, [2,3,4,5,6]).build()
crossval = CrossValidator(estimator=rfr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # use 3+ folds in practice

crossval_rmse = CrossValidator(estimator=rfr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator1,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(final_data)
cvModel_rmse = crossval_rmse.fit(final_data)

cvPredictions = cvModel.transform(final_data) 
accuracy = evaluator.evaluate(cvPredictions) 
print ("Model Accuracy with Cross Validation: ", accuracy*100)
cvPredictions1 = cvModel_rmse.transform(final_data) 
rmse = evaluator1.evaluate(cvPredictions1) 
print("RMSE: ", rmse)

In [67]:
from pyspark.ml.regression import (GBTRegressor)
gbr = GBTRegressor(labelCol='target', featuresCol='features')

In [68]:
GBR_model = gbr.fit(train)
GBR_pred = GBR_model.transform(test)
evaluator = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="r2")
accuracy = evaluator.evaluate(GBR_pred)

print("Accuracy on test data = %g" % (accuracy * 100))

evaluator1 = RegressionEvaluator(
    labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = evaluator1.evaluate(GBR_pred)
print("RMSE = %g" % (rmse))

In [69]:
paramGrid = ParamGridBuilder().addGrid(gbr.maxDepth, [2,3,4,5,6]).build()
crossval = CrossValidator(estimator=gbr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # use 3+ folds in practice

crossval_rmse = CrossValidator(estimator=gbr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator1,
                          numFolds=3)
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(final_data)
cvModel_rmse = crossval_rmse.fit(final_data)

cvPredictions = cvModel.transform(final_data) 
accuracy = evaluator.evaluate(cvPredictions) 
print ("Model Accuracy with Cross Validation: ", accuracy*100)
cvPredictions1 = cvModel_rmse.transform(final_data) 
rmse = evaluator1.evaluate(cvPredictions1) 
print("RMSE: ", rmse)

In [70]:
randomforest = rfr_pred.toPandas()

import matplotlib.pyplot as plt
%matplotlib inline
fig, ax = plt.subplots()
ax.plot(randomforest.prediction, label='Predicted')
ax.plot(randomforest.target, label='Target')
ax.set_xlabel('predicted date')
ax.set_ylabel('Number of trips')
ax.legend()
display(fig)

References:
1. https://forums.databricks.com/
2. https://stackoverflow.com/questions/tagged/databricks