## Citibike - NYC Sampling

In [49]:
import numpy as np
import pandas as pd
import matplotlib.cm
import matplotlib.pyplot as plt
import matplotlib.ticker as tkr

import matplotlib.lines as mlines
import matplotlib.dates as mdates
import matplotlib.cbook as cbook
import time

import findspark
findspark.init()

from geopy.distance import geodesic

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, sum, avg, udf, to_timestamp, date_trunc
from pyspark.sql.functions import year, month, hour, dayofweek
from pyspark.sql.functions import round, concat, col, lit

from pyspark.sql.types import FloatType, StructType, IntegerType, StringType, DoubleType, StructField, TimestampType, DateType
from pyspark.sql.types import TimestampType

import random

spark1 = SparkSession.builder.appName("CB").getOrCreate()

import datetime as dt
print("modules imported")

randomSeed = 1984

pathWeather = "/users/sajudson/Dropbox/WPI/DS504/project/weather/"
pathData = "/users/sajudson/Dropbox/WPI/DS504/project/data/"
pathFigure = "/users/sajudson/Dropbox/WPI/DS504/project/figures/"


plt.style.use('ggplot')

modules imported


## Select City and set filenames for input and output 

In [54]:


tripSampleFraction = .20
stationSampleFraction = .20


#Use NYC weather data for both data sets
weatherRaw = "NYC"+'weatherRaw'
weatherFeatures = "NYC"+'weatherFeatures'
weather_file_type = 'csv'

#SELECT WHICH CITY TO USE FOR ANALYSIS 
city = "NYC"
#city = "JC"
sample = str(int(tripSampleFraction*100))

filenameRaw = "citibike"+city+"Raw_sample_"+sample
filenameInput = filenameRaw+".csv"
filenameBF4 = "citibike"+city+"bf3_sample_"+sample
filenameOutput = filenameBF4+".csv"

print(filenameInput, ", ",filenameOutput)

citibikeNYCRaw_sample_20.csv ,  citibikeNYCbf3_sample_20.csv


In [55]:
# File location and type
file_location = pathData+filenameInput
file_type = "csv"
print(file_location)

/users/sajudson/Dropbox/WPI/DS504/project/data/citibikeNYCRaw_sample_20.csv


# Import Trip Data

In [56]:

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bf = spark1.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(bf)

DataFrame[tripduration: string, starttime: string, stoptime: string, start station id: string, start station name: string, start station latitude: string, start station longitude: string, end station id: string, end station name: string, end station latitude: string, end station longitude: string, bikeid: string, usertype: string, birth year: string, gender: string]

## Sample DataFrame & Cleanup



In [57]:
t0= time.time()
#bf1 = bf.sample(False,tripSampleFraction,randomSeed)
bf1 = bf

for col in bf1.columns:
  bf1 = bf1.withColumnRenamed(col,col.replace(" ", "_"))
bf1.dropna()
bf1.describe().show()
#print(time.time()-t0)

+-------+------------------+--------------------+--------------------+------------------+------------------+----------------------+-----------------------+------------------+----------------+--------------------+---------------------+------------------+----------+-----------------+------------------+
|summary|      tripduration|           starttime|            stoptime|  start_station_id|start_station_name|start_station_latitude|start_station_longitude|    end_station_id|end_station_name|end_station_latitude|end_station_longitude|            bikeid|  usertype|       birth_year|            gender|
+-------+------------------+--------------------+--------------------+------------------+------------------+----------------------+-----------------------+------------------+----------------+--------------------+---------------------+------------------+----------+-----------------+------------------+
|  count|           6668097|             6668097|             6668097|           6667711|     

# Create a view or table

In [58]:
temp_table_name = filenameRaw

bf1.createOrReplaceTempView(temp_table_name)

# Add column with calculated distance between stations 

In [59]:


t0 = time.time()
@udf('double')
def geodistance(start_latitude,start_longitude,end_latitude,end_longitude):
    d = geodesic((start_latitude,start_longitude),(end_latitude,end_longitude)).miles
    return (d)



bf1 = bf1.withColumn('distance',geodistance(bf1.start_station_latitude,bf1.start_station_longitude,
                                          bf1.end_station_latitude, bf1.end_station_longitude)
              )
bf1 = bf1.withColumn('tripduration_min', bf1.tripduration/60)

maxtripduration = 24*60
maxdistance = 50

bf1.filter(bf1['distance']<maxdistance)
bf1.filter(bf1['tripduration_min']<maxtripduration)

print(bf1.count(), len(bf1.columns))
#print(bf1.columns)
t1 = time.time()
print(t1-t0)

6668097 17
6.9388978481292725


In [60]:
t0 = time.time()
#bf1.show()
t1 = time.time()
print(t1-t0)

+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+------------------+------------------+
|tripduration|          starttime|           stoptime|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bikeid|  usertype|birth_year|gender|          distance|  tripduration_min|
+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+------------------+------------------+
|         508|2016-10-02 13:04:16|2016-10-02 13:12:44|           322.0|Clinton St & Till...|    40.696191999999996|      

In [9]:
#bf1.cache()


In [10]:
#bf1quantile = bf1.approxQuantile(["tripduration_min", "distance"], [0.5], 0.25)

# Drop columns that are not required


In [61]:
#df_new = df.drop(*drop_these)

t = time.time()
dropFeatures =[      'start_station_name',
                     'start_station_latitude', 'start_station_longitude', 
                     'end_station_name', 'end_station_latitude', 'end_station_longitude',
                     'bikeid', 'birth_year', 'gender']

median_distance = 0.62


bf2 = bf1.select([column for column in bf1.columns if column not in dropFeatures])


# Convert time and date string to timestamp, round down to hour


In [62]:
#print(time.time()-t)

#timestamps in m/d/yyyy hh:mm:ss format prior prior to 2016-10
#




bf2 = bf2.withColumn("starttime", to_timestamp(bf2.starttime,"yyyy-MM-dd HH:mm:ss").cast("timestamp"))
bf2 = bf2.withColumn("starttime", date_trunc("hour", bf2.starttime))
bf2 = bf2.withColumn("stoptime", to_timestamp(bf2.stoptime,"yyyy-MM-dd HH:mm:ss").cast("timestamp"))
bf2 = bf2.withColumn("stoptime", date_trunc("hour", bf2.stoptime))

#Check output
#t0 = time.time()
#bf2.show()
#print(time.time()-t0)                  


# Create new features based on distance between stations
Indicator variables
- local (distance = 0, bike returned to same station)
- short (distance <= median distance)
- long (distance > median distance)

In [63]:



@udf
def setShort(d):
    if d > 0 and d <= median_distance:
        return 1
    else:
        return 0

    
@udf    
def setLocal(d):
    if (d==0): 
        return(1)
    else: 
        return(0)
    

@udf    
def setLong(d):
    if (d> median_distance): 
        return(1)
    else: 
        return(0)

    

bf2 = bf2.withColumn('local', setLocal(bf2.distance).cast("integer"))
bf2 = bf2.withColumn('short', setShort(bf2.distance).cast("integer"))
bf2 = bf2.withColumn('long', setLong(bf2.distance).cast("integer"))
bf2 = bf2.withColumn('start_station_id', bf2['start_station_id'].cast('integer'))
bf2 = bf2.withColumn('end_station_id', bf2['end_station_id'].cast('integer'))
#bf2.show()
#bf2.cache

# Create dataframe grouped by timestamp and station id 
- Split trip dataframe in to supply (trips ending at a station)  and demand (trips starting at a station) dataframes.
- Aggregrate by hour and station id
- Join supply and demand dataframes into single data frame based on hour and station id
- Calculate net demand at each station


In [64]:
demandDropFeatures =['tripduration', 'usertype', 'distance',
                     'stoptime', 
                     'subscriber','customer',
                     'end_station_id', 
                     'tripduration_min',
                     ]
supplyDropFeatures =['tripduration',  'usertype', 'distance',
                     'starttime', 
                     'subscriber','customer',
                     'start_station_id',
                     'tripduration_min',
                     ]


bf2_demand = bf2.select([column for column in bf2.columns if column not in demandDropFeatures])
bf2_demand = bf2_demand.withColumnRenamed("starttime", "datetime").withColumnRenamed("start_station_id","station_id")

bf2_supply = bf2.select([column for column in bf2.columns if column not in supplyDropFeatures])
bf2_supply = bf2_supply.withColumnRenamed("stoptime", "datetime").withColumnRenamed("end_station_id","station_id")

bf2_d_agg = bf2_demand.groupBy("datetime","station_id").agg(sum("local"), sum("short"),sum("long")).orderBy('datetime', 'station_id' )
bf2_d_agg = bf2_d_agg.withColumnRenamed("sum(local)", "local_demand").withColumnRenamed("sum(short)", "short_demand").withColumnRenamed("sum(long)", "long_demand")

bf2_s_agg = bf2_supply.groupBy("datetime","station_id").agg(sum("local"), sum("short"),sum("long"))
bf2_s_agg = bf2_s_agg.withColumnRenamed("sum(local)", "local_supply").withColumnRenamed("sum(short)", "short_supply").withColumnRenamed("sum(long)", "long_supply")

bf3 = bf2_d_agg.join(bf2_s_agg, ['datetime','station_id'])

bf3 = bf3.withColumn('totalDemand', (bf3.local_demand+bf3.short_demand+bf3.long_demand))
bf3 = bf3.withColumn('totalSupply', ((bf3.local_supply+bf3.short_supply+bf3.long_supply)))


#remove short local long and net demand features (reinstate later if feasible/necessary)
#bf3 = bf3.withColumn('netDemand', (bf3.totalDemand)-(bf3.totalSupply))

dropFeatures3 =['short_demand',  'long_demand', 'local_demand',
               'short_supply',  'long_supply', 'local_supply',]


bf3 = bf3.select([column for column in bf3.columns if column not in dropFeatures3])



In [65]:
#bf3.show()

+-------------------+----------+-----------+-----------+
|           datetime|station_id|totalDemand|totalSupply|
+-------------------+----------+-----------+-----------+
|2016-10-01 11:00:00|      3428|          3|          1|
|2016-10-01 12:00:00|       301|          4|          2|
|2016-10-01 12:00:00|      3398|          1|          1|
|2016-10-01 14:00:00|       309|          1|          2|
|2016-10-01 14:00:00|       356|          2|          1|
|2016-10-01 16:00:00|       281|          1|          1|
|2016-10-01 16:00:00|       312|          1|          2|
|2016-10-01 16:00:00|       444|          5|          6|
|2016-10-01 16:00:00|      3092|          2|          2|
|2016-10-01 17:00:00|       432|          5|          1|
|2016-10-01 19:00:00|       167|          1|          3|
|2016-10-01 19:00:00|       523|          1|          1|
|2016-10-01 20:00:00|      3435|          3|          3|
|2016-10-02 11:00:00|       251|          2|          6|
|2016-10-02 13:00:00|      3052

## Sample based on stations


In [66]:
t0= time.time()
#create DF with unique station ids
stationDF = bf3.select("station_id").distinct().orderBy("station_id")
#print(stationDF.count())
#create DF with sample of station is
sampleStationDF = stationDF.sample(False,stationSampleFraction,randomSeed)
#convertsample station dataframe to a list
sampleStationList = sampleStationDF.select("station_id").rdd.map(lambda row : row[0]).collect()                         
print(len(sampleStationList))
bf4 = bf3.filter(bf3.station_id.isin(sampleStationList))
print(sampleStationList)

#bf4.show()
print(time.time()-t0)

180
[72, 116, 119, 143, 150, 152, 153, 161, 168, 195, 238, 244, 245, 252, 255, 259, 260, 275, 278, 279, 280, 285, 289, 307, 312, 319, 322, 324, 344, 346, 347, 348, 349, 351, 365, 369, 373, 395, 409, 416, 445, 454, 457, 460, 461, 467, 468, 472, 488, 490, 500, 526, 530, 534, 536, 537, 2002, 2003, 2004, 2012, 2017, 3002, 3041, 3049, 3052, 3061, 3073, 3075, 3076, 3077, 3078, 3098, 3102, 3105, 3107, 3113, 3120, 3121, 3123, 3124, 3135, 3140, 3148, 3150, 3158, 3165, 3166, 3175, 3219, 3224, 3241, 3246, 3249, 3255, 3283, 3296, 3306, 3308, 3311, 3312, 3315, 3318, 3322, 3330, 3333, 3338, 3341, 3342, 3344, 3346, 3347, 3348, 3353, 3354, 3357, 3359, 3368, 3369, 3373, 3376, 3383, 3387, 3392, 3393, 3396, 3397, 3407, 3415, 3417, 3424, 3427, 3441, 3447, 3459, 3464, 3467, 3478, 3496, 3497, 3498, 3508, 3510, 3513, 3516, 3522, 3525, 3528, 3536, 3541, 3551, 3559, 3560, 3561, 3563, 3564, 3568, 3570, 3571, 3576, 3580, 3586, 3599, 3601, 3604, 3606, 3613, 3615, 3617, 3620, 3631, 3632, 3644, 3647, 3648, 3654, 36

Save file to disk


In [67]:
t0= time.time()
bf4.write.csv(pathData+filenameOutput, mode="overwrite", header=True, sep=",")
print(time.time()-t0)

# this operation takes ~140 seconds without bf4 in cache for JC data at 100%
# this operation takes ~685 seconds without bf4 in cache for NYC data at 120%


685.344743013382


Describe file save to disk for later comparison

In [68]:
t0= time.time()
bf4.describe().show()
print(time.time()-t0)
bf4.show()
#
# this operation takes ~147 seconds with bf4 in cache

+-------+------------------+------------------+------------------+
|summary|        station_id|       totalDemand|       totalSupply|
+-------+------------------+------------------+------------------+
|  count|            365562|            365562|            365562|
|   mean|1523.4033898490543|  2.25856899787177|2.2852265826316738|
| stddev|1410.4577855968032|1.8102462485889539|1.8422900745279007|
|    min|                72|                 1|                 1|
|    max|              3680|                49|                37|
+-------+------------------+------------------+------------------+

656.0118918418884
+-------------------+----------+-----------+-----------+
|           datetime|station_id|totalDemand|totalSupply|
+-------------------+----------+-----------+-----------+
|2016-10-01 16:00:00|       312|          1|          2|
|2016-10-02 13:00:00|      3052|          1|          2|
|2016-10-03 12:00:00|       472|          5|          2|
|2016-10-03 14:00:00|      3427|    

In [None]:
qccheck = bf4.groupBy("station_id").agg(sum("totalDemand"), sum("totalSupply"),count("station_id"))

In [70]:
qccheck.show(180)

+----------+----------------+----------------+-----------------+
|station_id|sum(totalDemand)|sum(totalSupply)|count(station_id)|
+----------+----------------+----------------+-----------------+
|      3175|            6677|            6753|             3593|
|      3098|              57|              57|               50|
|      3561|             105|             106|              100|
|      3105|             936|             953|              727|
|       255|              13|              12|               10|
|      3631|              35|              29|               28|
|       472|           20082|           20019|             6901|
|      3306|            1184|            1193|              926|
|       322|            2698|            2786|             1742|
|      3601|              72|              75|               67|
|      3441|             113|              97|               85|
|      3124|            2495|            2492|             1647|
|       530|           14

## Code after this cell has been moved to other notebooks