## New York City Bike
"Citi Bike is the nation's largest bike share program, with 10,000 bikes and 600 stations across Manhattan, Brooklyn, Queens and Jersey City. It was designed for quick trips with convenience in mind, and it’s a fun and affordable way to get around town."
Selected questions that were explored in this analysis:   
(1) Given the latitude and longitude of each stop, what is the average distance per trip.  
(2) What is the fraction of events that bikes are removed from stations for maintenance, by looking at the pickup location of the next trip and the dropoff location of the previous trip for each bike.   
(3) Find stations that have abnomally high usage compared to the other stations at different hour of the day.

In [2]:
data_links=['https://s3.amazonaws.com/tripdata/201501-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201502-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201503-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201504-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201505-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201506-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201507-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201508-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201509-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201510-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201511-citibike-tripdata.zip',
           'https://s3.amazonaws.com/tripdata/201512-citibike-tripdata.zip']


In [3]:
dbutils.fs.mkdirs("/NYC bikes/")

In [4]:
# download tables to databricks
import os
from urlparse import urlparse
from requests import Session
from io import BytesIO
import zipfile
import tempfile
from urlparse import urlparse
    
def download_to_spark(URL_link, unzip=True):
  session = Session()
  tmp = BytesIO()
  f = session.get(URL_link, headers={'User-Agent': 'Databricks'}, stream=True)
  for chuck in f.iter_content(chunk_size=1024):
    if chuck:
      tmp.write(chuck)
  tmp.seek(0)
  
  if zipfile.is_zipfile(tmp) and unzip:
    zf = zipfile.ZipFile(tmp)   
    member = zf.namelist()[0]
  with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='csv') as t:
    t.write(zf.read(member))
    t.close()
    dbutils.fs.cp('file://{0}'.format(t.name), os.path.join('/NYC bikes/', member))
    os.unlink(t.name)


In [5]:
for link in data_links:
  download_to_spark(link)

In [6]:
# check whether download is successful. 
display(dbutils.fs.ls("/NYC bikes/"))

In [7]:
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType, StructField

bike_data_features= ['tripduration',
                   'starttime',
                   'stoptime',
                   'start station id',
                   'start station name',
                   'start station latitude',
                   'start station longitude',
                   'end station id',
                   'end station name',
                   'end station latitude',
                   'end station longitude',
                   'bikeid',
                   'usertype',
                   'birth year',
                   'gender']
bike_data_dtypes = [IntegerType(),
                    StringType(),
                    StringType(),
                    IntegerType(),
                    StringType(),
                    FloatType(),
                    FloatType(),
                    IntegerType(),
                    StringType(),
                    FloatType(),
                    FloatType(),
                    IntegerType(),
                    StringType(),
                    IntegerType(),
                    IntegerType()
                   ]

bike_data_schema = StructType([StructField(i,j,True) for i, j in zip(bike_data_features,bike_data_dtypes)])

In [8]:
# combine all the tables into one dataframe
count = 0
for file in dbutils.fs.ls('/NYC bikes/'):
  file_path = file.path
  if count ==0:
    raw_bike_df = sqlContext.read.format("com.databricks.spark.csv").options(header=True).schema(bike_data_schema).load(file_path)
  else:
    tmp_df = sqlContext.read.format("com.databricks.spark.csv").options(header=True).schema(bike_data_schema).load(file_path)
    raw_bike_df = raw_bike_df.unionAll(tmp_df)
  count +=1
  tmp_df=[]
raw_bike_df.cache() 

In [9]:
print raw_bike_df.count()
print raw_bike_df.distinct().count()
print raw_bike_df.select('bikeid').distinct().count()

In [10]:
display(raw_bike_df.head(2))

In [11]:
# check every numerical columns for outliers.
raw_bike_df.describe(['tripduration','start station latitude','start station longitude','end station latitude','end station longitude']).show()

The maximum trip duration is more than 1629 hours. But even for annual membership (https://www.citibikenyc.com/pricing), riders will be charged for additional dollars if keeping the bikes for more than 45 minutes in a single ride. 
So let's assume that entries that have duration longer than 24 hours are errors.

In [13]:
display(raw_bike_df.filter(raw_bike_df.tripduration>86400).collect())

In [14]:
import matplotlib.pyplot as plt
import numpy as np

(1) Let's look at median trip duration

In [16]:
tripduration = raw_bike_df.select('tripduration').collect()
print 'median trip duration is %.10f' %(np.median(tripduration))

Most people use Citi bikes for short trips.

(2) The fraction of rides start and end at the same station.

In [19]:
return_to_same_stop_df = raw_bike_df.filter(raw_bike_df['start station id']==raw_bike_df['end station id'])
ratio_to_the_same_stop = float(return_to_same_stop_df.count())/float(raw_bike_df.count())
print ratio_to_the_same_stop

Only 2% of the bikes are returned to the same stops.

(3) The standard deviation of the number of stations visited by a bike.
The station visited by a bike could be the start station or the end station. So first I create a table by selecting 
bikeid and start station id, and another table by selecting bikeid and end station id. The I union two tables, and count
the unique stations, grouped by bikeid.

In [22]:
import pyspark.sql.functions as F
bike_start_df = raw_bike_df.select('bikeid','start station id').withColumnRenamed('start station id', 'station_visited')
bike_end_df = raw_bike_df.select('bikeid','end station id').withColumnRenamed('end station id','station_visited')
bike_stations_df = bike_start_df.unionAll(bike_end_df)
bike_stations_df = bike_stations_df.distinct().groupBy('bikeid').count()
std_stations_visited = bike_stations_df.agg(F.stddev('count'))
print std_stations_visited.collect()[0][0]


The average length, in kilometers, of a trip. Assume trips follow great circle arcs from the start station to the end station. Ignore trips that start and end at the same station, as well as those with obviously wrong data.

In [24]:
geometry_df = raw_bike_df.filter(~(raw_bike_df['start station id']==raw_bike_df['end station id']))
geometry_df = geometry_df.select('tripduration',
                                 geometry_df['start station latitude'].alias('s_lat'), 
                                 geometry_df['start station longitude'].alias('s_lon'),
                                 geometry_df['end station latitude'].alias('e_lat'),
                                 geometry_df['end station longitude'].alias('e_lon'))

In [25]:
from math import sin, cos, sqrt, atan2, radians
def great_circle_distance(lat_s,lon_s,lat_e,lon_e):
  earth_radius = 6371.0088
  lat1=radians(lat_s)
  lon1=radians(lon_s)
  lat2=radians(lat_e)
  lon2=radians(lon_e)  
  
  lat_diff=lat2-lat1
  lon_diff=lon2-lon1
  
  a = sin(lat_diff / 2)**2 + cos(lat1) * cos(lat2) * sin(lon_diff / 2)**2
  c = 2 * atan2(sqrt(a), sqrt(1 - a))
  distance = earth_radius * c
  return distance

x = great_circle_distance(52.2296756,21.0122287, 52.406374,16.9251681)
print type(x)

distance_udf = udf(great_circle_distance,FloatType())


In [26]:
geometry_df = geometry_df.withColumn('distance', distance_udf('s_lat','s_lon','e_lat','e_lon'))
geometry_df = geometry_df.withColumn('speed', (geometry_df.distance/geometry_df.tripduration)*3600.0) # calculate the speed km/hr

In [27]:
geometry_df.describe('speed').show()

The hourly speed for bike should be less than 40 km/hr. There might be errors in the data.

In [29]:
geometry_df.filter(geometry_df.speed >40).count()

In [30]:
geometry_corrected = geometry_df.filter(geometry_df.speed <40)
average_distance = geometry_corrected.agg(F.mean('distance')).collect()[0][0]
print average_distance

The Average length of the trip is 1.76 km.

(5) Calculate the average duration of trips for each month in the year. We should see a seasonal pattern.

In [33]:
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def extract_month(start_time):
  try:
    t1 = datetime.strptime(start_time, '%m/%d/%Y %H:%M:%S')
  except ValueError:
    t1 = datetime.strptime(start_time, '%m/%d/%Y %H:%M')
  return t1.month
extract_month_udf = udf(extract_month, IntegerType())

In [34]:
duration_df = raw_bike_df.select('tripduration',extract_month_udf('starttime').alias('month'))
duration_mean_df = duration_df.groupBy('month').avg('tripduration')
duration_spread =duration_mean_df.agg(F.max('avg(tripduration)')-F.min('avg(tripduration)')).first()[0]
print duration_spread

In [35]:
duration_mean_df.sort('avg(tripduration)',ascending = False).show()

It is very surprising that on average, the October and September has longest tripduration durations. I would assume July and August are the busy seasons.

(6) Next we'll look at the how busy the stations are. Let us define the hourly usage fraction of a station to be the fraction of all rides starting at that station that leave during a specific hour.

In [38]:
def extract_hour(start_time):
  try:
    t1 = datetime.strptime(start_time, '%m/%d/%Y %H:%M:%S')
  except ValueError:
    t1 = datetime.strptime(start_time, '%m/%d/%Y %H:%M')
  return t1.hour
extract_hour_udf = udf(extract_hour, IntegerType())

In [39]:
raw_bike_df.select('bikeid').distinct().count()

In [40]:
# Let's look at overall bike usage rate
hourly_bike_df = raw_bike_df.select('start station id',extract_hour_udf('starttime').alias('hour'))
hourly_df = hourly_bike_df.groupBy('hour').count()
total_uses = float(hourly_df.agg(F.sum('count')).collect()[0][0])
hourly_df = hourly_df.select('hour','count',(hourly_df['count']/total_uses).alias('system_ratio'))
hourly_df = hourly_df.withColumnRenamed('count','hourly_count')

In [41]:
display(hourly_df.sort('system_ratio', ascending=False).collect())

Bikes have highest usage around rush hours in the morning or in the late afternoon.

In [43]:
station_df = hourly_bike_df.groupBy(['start station id','hour']).count().sort('start station id','hour')
station_hourly_df = station_df.groupBy('start station id').sum('count')
display(station_hourly_df.take(2))

In [44]:
station_fraction_df = station_df.join(station_hourly_df, 'start station id', 'left_outer').join(hourly_df,'hour','left_outer')
#display(station_fraction_df)
station_fraction_df = station_fraction_df.withColumn('station hourly usage fraction',
                                                     station_fraction_df['count']/station_fraction_df['sum(count)'])

In [45]:
station_fraction_df = station_fraction_df.withColumn('over load ratio',
                                                     station_fraction_df['station hourly usage fraction']/station_fraction_df['system_ratio'])
over_load_station = station_fraction_df.sort('over load ratio',ascending=False)
over_load_station.first()

For example, 3 o'clock in the morning station 3103 has abnormal usage rate compared to the all other stations.

(7) There are two types of riders: "Customers" and "Subscribers." Customers buy a short-time pass which allows 30-minute rides. Subscribers buy yearly passes that allow 45-minute rides. What fraction of rides exceed their corresponding time limit?

In [48]:
timelimit_short = 30*60
timelimit_long = 45*60
subscriber_df = raw_bike_df.filter(raw_bike_df.usertype =='Subscriber')
subscriber_exceed_num = subscriber_df.filter(subscriber_df.tripduration > timelimit_short).count()
subscriber_exceed_ratio = float(subscriber_exceed_num)/subscriber_df.count()

customers_df = raw_bike_df.filter(raw_bike_df.usertype=='Customer')
customers_exceed_num = customers_df.filter(customers_df.tripduration > timelimit_long).count()
customers_exceed_ratio = float(customers_exceed_num)/customers_df.count()

print subscriber_exceed_ratio,customers_exceed_ratio

The subscribers have smaller chance to exceed the single ride limit.

(8). Most of the time, a bike will begin a trip at the same station where its previous trip ended. Sometimes a bike will be moved by the program, either for maintenance or to rebalance the distribution of bikes. Let's find out the average number of times a bike is moved during this period, as detected by seeing if it starts at a different station than where the previous ride ended?

In [51]:
# create index id column "timeid",create two tables, the first table contains bikeid, start station id, timeid and the second table contains bikeid, end station id, timeid
# For each table, Substrate 1 from "timeid" for the first table
# join two tables by timeid and bikeid
from pyspark.sql import Row
def mergeIndex(onerow):
  return Row(**dict(onerow[0].asDict().items() + [('index', onerow[1])]))
ordered_bike_df = (raw_bike_df.select('bikeid','starttime','start station id','end station id')
                             .withColumnRenamed("start station id","start_station_id")
                             .withColumnRenamed("end station id","end_station_id")
                             .orderBy('bikeid','starttime'))
bike_index_df = ordered_bike_df.rdd.zipWithIndex().map(mergeIndex).toDF()

In [52]:
display(bike_index_df.take(5))

In [53]:
start_df = bike_index_df.select('index', 'bikeid', 'start_station_id')
end_df = bike_index_df.selectExpr('index+1', 'bikeid', 'end_station_id').withColumnRenamed('(index + 1)', 'index')
shifted_df = start_df.join(end_df, on=['bikeid', 'index']).cache()

In [54]:
diff_df = shifted_df.filter(shifted_df.start_station_id != shifted_df.end_station_id).cache()
diff_count = diff_df.count()
total_count = bike_index_df.count()

In [55]:
print "The probability that the bike gets removed for maintenance is {0:.2f}.".format(diff_count / float(total_count))

11% of the time, the bikes were taken for maintainence.

In [57]:
# 2nd method using window function
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
ordered_bike_df = raw_bike_df.orderBy('bikeid','starttime')
display(ordered_bike_df)
w = Window().partitionBy('bikeid').orderBy('starttime')
aligned_bike_df = ordered_bike_df.select("*", lag('end station id').over(w).alias("stop station id")).orderBy('bikeid','starttime')
display(aligned_bike_df)


In [58]:
bike_replace_df = aligned_bike_df.dropna().filter(aligned_bike_df['start station id']!=aligned_bike_df['stop station id'])
replace_ratio = float(bike_replace_df.count())/raw_bike_df.count()
print replace_ratio