In [1]:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import datetime
from pyspark.ml.regression import GBTRegressor

In [2]:
# bart data
bart_arrival = spark.read.parquet("s3a://normalized-data-weather-bart/bart_arrival_0_2017/03/04/*")
bart_physical = spark.read.parquet("s3a://normalized-data-weather-bart/bart_physical_0_2017/03/04/*")
# weather data
wind_table = spark.read.parquet("s3a://normalized-data-weather-bart/wind_df2017/03/04/*")
main_temp_table = spark.read.parquet("s3a://normalized-data-weather-bart/main-temp2017/03/04/*")
weather_description_table = spark.read.parquet("s3a://normalized-data-weather-bart/weather-description2017/03/04/*")

In [3]:
bart_arrival_renamed = bart_arrival.select(col('origin_station_0').alias('origin_station_arrival'),\
                    col("sf_time_0").alias("sf_time_arrival"),col("date_0").alias("date_arrival"),
                    col("direction_0").alias("direction_arrival"),col("hour_0").alias("hour_arrival"),
                    col("minutes_til_arrival_0").alias("minutes_til_arrival_bart_arrival"),
                       )

In [51]:
t = '03/03/2017'

In [37]:
r = datetime.datetime.strptime(t, '%M/%d/%Y')

In [57]:
t[:2] # month
t[3:5] #day
t[6:] #year

'2017'

In [72]:
def change_time_format(time):
    month = t[:2]
    day = t[3:5]
    year = t[6:]
    date_time= datetime.datetime(int(year),int(month),int(day))
    return date_time

In [73]:
change_time_format(t)

datetime.datetime(2017, 3, 3, 0, 0)

In [8]:
joined_bart_arrival_physical_df = bart_arrival_renamed.join(bart_physical, \
            on = [bart_physical['origin_station_0']==bart_arrival_renamed['origin_station_arrival'],\
                 bart_physical['sf_time_0']==bart_arrival_renamed['sf_time_arrival'],\
                 bart_physical['direction_0']==bart_arrival_renamed['direction_arrival'],\
                 bart_physical['date_0']==bart_arrival_renamed['date_arrival']])

In [9]:
final_joined_bart_df = joined_bart_arrival_physical_df.select("origin_station_0","sf_time_0","date_0","direction_0",
                                                             "destination_0","hour_0","color_0","bike_flag_0",
                                                             "train_size_0","capacity_0","minutes_til_arrival_bart_arrival")

In [7]:
final_joined_bart_df.show(2)

+--------------------+---------------+----------+-----------+-------------+------+-------+-----------+------------+----------+--------------------------------+
|    origin_station_0|      sf_time_0|    date_0|direction_0|destination_0|hour_0|color_0|bike_flag_0|train_size_0|capacity_0|minutes_til_arrival_bart_arrival|
+--------------------+---------------+----------+-----------+-------------+------+-------+-----------+------------+----------+--------------------------------+
|Pleasant Hill/Con...|05:50:01 PM PST|03/03/2017|      North|North Concord|     1| YELLOW|          1|           9|      1800|                              13|
|Pleasant Hill/Con...|05:50:01 PM PST|03/03/2017|      South|   SF Airport|     1| YELLOW|          1|          10|      2000|                              13|
+--------------------+---------------+----------+-----------+-------------+------+-------+-----------+------------+----------+--------------------------------+
only showing top 2 rows



In [75]:
## convert the string date to datetime format
final_joined_bart_df_dt = final_joined_bart_df.withColumn('date_0', rearrange_time (col('date_0')))

In [76]:
final_joined_bart_df_dt.show()

+--------------------+---------------+----------+-----------+-------------------+------+-------+-----------+------------+----------+--------------------------------+
|    origin_station_0|      sf_time_0|    date_0|direction_0|      destination_0|hour_0|color_0|bike_flag_0|train_size_0|capacity_0|minutes_til_arrival_bart_arrival|
+--------------------+---------------+----------+-----------+-------------------+------+-------+-----------+------------+----------+--------------------------------+
|Pleasant Hill/Con...|05:50:01 PM PST|2017-03-03|      North|      North Concord|     1| YELLOW|          1|           9|      1800|                              13|
|Pleasant Hill/Con...|05:50:01 PM PST|2017-03-03|      South|         SF Airport|     1| YELLOW|          1|          10|      2000|                              13|
|    19th St. Oakland|05:50:01 PM PST|2017-03-03|      North|      North Concord|     1| YELLOW|          1|          10|      2000|                               3|
|   

In [83]:
final_bart_df = final_joined_bart_df_dt.select(dayofmonth(col('date_0')).alias("day_of_month"),
                           month(col("date_0")).alias("month_n"),
                               "origin_station_0","sf_time_0","date_0",
                              "direction_0", "hour_0","bike_flag_0","train_size_0","capacity_0",
                              "minutes_til_arrival_bart_arrival")

In [84]:
sqlContext.registerDataFrameAsTable(final_bart_df,"final_bart_df")

In [85]:
final_bart_df = sqlContext.sql("""SELECT count(bike_flag_0) as total_number_of_trains,origin_station_0,date_0,direction_0,
    SUM (train_size_0) as total_number_train_cars, sum(capacity_0) as total_capacity,month_n,day_of_month
     FROM  final_bart_df
                WHERE minutes_til_arrival_bart_arrival <5 
                GROUP BY origin_station_0,date_0,direction_0,month_n,day_of_month
                ORDER BY total_capacity DESC""")

In [86]:
## bring in weather data to ultimately join with bart data and feed into our ML model

In [87]:
wind_table_alias = wind_table.select(col("hour").alias("hour_wind"),col("date").alias("date_wind"),"speed","deg")

In [88]:
wind_temp_table = wind_table_alias .join(main_temp_table, on=[wind_table_alias ['hour_wind']==main_temp_table['hour'],
                                    wind_table_alias ['date_wind']==main_temp_table['date']])

In [89]:
wind_temp_table_final = wind_temp_table.select("hour","date","speed","deg","pressure","temp","temp_max","temp_min")

In [90]:
weather_des_final = weather_description_table.select(col("col").alias('weather_des'),
                                "hour","date")

In [91]:
weather_des_final.registerTempTable("weather_des_final")

In [92]:
weather_des_ints = sqlContext.sql("""SELECT hour as hour_des,date as date_des,weather_des,
     CASE WHEN weather_des = 'Rain' THEN 1.0
         WHEN weather_des = 'Mist' THEN .1
         ELSE 0.0 end as weather_precip
       FROM  weather_des_final  
        """)

In [93]:
weather_des_ints_final =  weather_des_ints .select("hour_des","date_des","weather_precip")

In [94]:
## join weather des with the other two weather tables

combo_df = wind_temp_table_final.join(weather_des_ints_final, on =[wind_temp_table_final['date']==weather_des_ints_final['date_des'],
                                           wind_temp_table_final['hour']==weather_des_ints_final['hour_des']])

In [95]:
final_df = combo_df.select("hour","date","speed","deg","pressure","temp","temp_max","temp_min","weather_precip")

In [96]:
# convert the date string into the date format for spark
combo_df = combo_df.withColumn('date', rearrange_time(col('date')))

In [97]:
## finally, select all of the columns
## drop duplicates to ensure we only have only weather forecast per hour
final_weather_df = combo_df.select(dayofmonth(col('date')).alias("day_of_month_weather"),
                           month(col("date")).alias("month_n_weather"),
               "hour","date","speed","deg","pressure","temp","temp_max",
                                   "temp_min","weather_precip").dropDuplicates(
    ['day_of_month_weather','month_n_weather','hour'])

In [98]:
## confirm one forecast per hour
# final_weather_df.where(col("hour")=='9').show()

In [99]:
final_weather_df.cache()

DataFrame[day_of_month_weather: int, month_n_weather: int, hour: int, date: date, speed: double, deg: double, pressure: bigint, temp: double, temp_max: double, temp_min: double, weather_precip: decimal(2,1)]

In [100]:
final_weather_df.show()

+--------------------+---------------+----+----------+-----+-------+--------+-----+--------+--------+--------------+
|day_of_month_weather|month_n_weather|hour|      date|speed|    deg|pressure| temp|temp_max|temp_min|weather_precip|
+--------------------+---------------+----+----------+-----+-------+--------+-----+--------+--------+--------------+
|                   3|              3|   1|2017-03-03|10.29|  340.0|    1009|61.43|    80.6|    53.6|           0.0|
|                   3|              3|   8|2017-03-03| 8.05|  250.0|    1016|50.65|    77.0|    30.2|           0.0|
|                   3|              3|  10|2017-03-03| 7.18|270.006|    1036|50.36|    75.2|    30.2|           1.0|
|                   3|              3|  19|2017-03-03|12.75|  210.0|    1017|57.63|    62.6|    53.6|           0.0|
|                   3|              3|   9|2017-03-03| 9.17|  260.0|    1016|47.88|    53.6|    30.2|           0.0|
|                   3|              3|   3|2017-03-03|19.46|  27

In [101]:
final_bart_df.cache()

DataFrame[total_number_of_trains: bigint, origin_station_0: string, date_0: date, direction_0: string, total_number_train_cars: bigint, total_capacity: bigint, month_n: int, day_of_month: int]

In [102]:
final_bart_df.show()

+----------------------+--------------------+----------+-----------+-----------------------+--------------+-------+------------+
|total_number_of_trains|    origin_station_0|    date_0|direction_0|total_number_train_cars|total_capacity|month_n|day_of_month|
+----------------------+--------------------+----------+-----------+-----------------------+--------------+-------+------------+
|                   369|           Fruitvale|2017-03-03|      North|                   3017|        603400|      3|           3|
|                   218|           Fruitvale|2017-03-03|      South|                   1910|        382000|      3|           3|
|                   133|            Millbrae|2017-03-03|      North|                   1248|        249600|      3|           3|
|                   114|         Balboa Park|2017-03-03|      South|                    893|        178600|      3|           3|
|                    99|West Dublin/Pleas...|2017-03-03|      South|                    873|     

In [36]:
temp_table = final_weather_df.join(final_bart_df, on =[final_weather_df['date']==final_bart_df['date_0'],
                                         ]
                     )

In [38]:
final_bart_weather_table = temp_table.select("day_of_month","month_n","total_capacity","total_number_train_cars",
                                         "direction_0","date_0","origin_station_0","weather_precip",
                                         "speed","deg","pressure","temp","temp_max","temp_min")

In [39]:
## next, load the GB model

In [57]:
gb_model = GBTRegressor.load("s3a://predicting-bart-ridership-model/*/*/*")

Py4JJavaError: An error occurred while calling o1037.load.
: org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3a://predicting-bart-ridership-model/*/*/*/metadata matches 0 files
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:253)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1332)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1326)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1367)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1366)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:379)
	at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:322)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
sameModel = LogisticRegressionModel.load(sc, "lrm_model.model")