In [1]:
from pyspark.sql.types import *
from datetime import datetime

INPUT_DATA = '../data/sf-bay-area-bike-share/'

In [2]:
!head -5 {INPUT_DATA}status.csv

station_id,bikes_available,docks_available,time
2,2,25,2013/08/29 12:06:01
2,2,25,2013/08/29 12:07:01
2,2,25,2013/08/29 12:08:01
2,2,25,2013/08/29 12:09:01


In [3]:
!head -5 {INPUT_DATA}station.csv

id,name,lat,long,dock_count,city,installation_date
2,San Jose Diridon Caltrain Station,37.329732,-121.90178200000001,27,San Jose,8/6/2013
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013


In [4]:
bike_status = sc.textFile(INPUT_DATA + 'status.csv').cache()
header = bike_status.first()
bike_status = bike_status.filter(lambda x: x != header).map(lambda x: x.split(',')).cache()

In [5]:
bike_status.count()

71984434

In [6]:
test_ts = bike_status.take(1)[-1][-1]
dt = datetime.strptime(test_ts, "%Y/%m/%d %H:%M:%S")
dt

datetime.datetime(2013, 8, 29, 12, 6, 1)

In [7]:
def toIntSafe(num):
    try:
        return int(num)
    except ValueError:
        return None
    
def toFloatSafe(num):
    try:
        return float(num)
    except ValueError:
        return None    
    
def toTimeStampSafe(data):
    try:
        return datetime.strptime(data, "%Y/%m/%d %H:%M:%S") 
    except ValueError:
        return None
    
def convertData(data):
    return (toIntSafe(data[0]),
            toIntSafe(data[1]),
            toIntSafe(data[2]),
            toTimeStampSafe(data[3]))

def preprocess_data(data):
    try:
        return (data[0], (data[2]*1.0)/(data[1] + data[2]), data[3].year,
            data[3].month, data[3].day, data[3].hour, data[3].minute, data[3].isoweekday())
    except (AttributeError, ValueError, ZeroDivisionError):
        return None

bike_status_processed = bike_status.map(lambda x:
                                        convertData(x)).map(lambda x:preprocess_data(x)).filter(lambda x: x != None).cache()

In [8]:
bike_status_processed.count()

16994602

In [9]:
bike_status.unpersist()

PythonRDD[3] at RDD at PythonRDD.scala:48

In [10]:
bike_status_processed.take(3)

[(2, 0.9259259259259259, 2013, 8, 29, 12, 6, 4),
 (2, 0.9259259259259259, 2013, 8, 29, 12, 7, 4),
 (2, 0.9259259259259259, 2013, 8, 29, 12, 8, 4)]

In [11]:
bike_status_schema = StructType([StructField('station_id', IntegerType(),False),
                                StructField('bikes_utilised_percentage', FloatType(),False),
                                StructField('year', IntegerType(), False),
                                StructField('month', IntegerType(), False),
                                StructField('day', IntegerType(), False),
                                StructField('hour', IntegerType(), False),
                                StructField('minute', IntegerType(), False),
                                StructField('day_of_week', IntegerType(), False)])

In [12]:
bike_status_df = sqlContext.createDataFrame(bike_status_processed, bike_status_schema).cache()
bike_status_df.show(5)

+----------+-------------------------+----+-----+---+----+------+-----------+
|station_id|bikes_utilised_percentage|year|month|day|hour|minute|day_of_week|
+----------+-------------------------+----+-----+---+----+------+-----------+
|         2|                0.9259259|2013|    8| 29|  12|     6|          4|
|         2|                0.9259259|2013|    8| 29|  12|     7|          4|
|         2|                0.9259259|2013|    8| 29|  12|     8|          4|
|         2|                0.9259259|2013|    8| 29|  12|     9|          4|
|         2|                0.9259259|2013|    8| 29|  12|    10|          4|
+----------+-------------------------+----+-----+---+----+------+-----------+
only showing top 5 rows



In [13]:
bike_status_df.count()

16994602

In [14]:
bike_status_df.printSchema()

root
 |-- station_id: integer (nullable = false)
 |-- bikes_utilised_percentage: float (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- hour: integer (nullable = false)
 |-- minute: integer (nullable = false)
 |-- day_of_week: integer (nullable = false)



In [15]:
bike_status_processed.unpersist()

PythonRDD[6] at RDD at PythonRDD.scala:48

In [16]:
bike_status_df.select('day_of_week').distinct().collect()

[Row(day_of_week=1),
 Row(day_of_week=6),
 Row(day_of_week=3),
 Row(day_of_week=5),
 Row(day_of_week=4),
 Row(day_of_week=7),
 Row(day_of_week=2)]

In [17]:
bike_status_df.select('hour').distinct().collect()

[Row(hour=12),
 Row(hour=22),
 Row(hour=1),
 Row(hour=13),
 Row(hour=16),
 Row(hour=6),
 Row(hour=3),
 Row(hour=20),
 Row(hour=5),
 Row(hour=19),
 Row(hour=15),
 Row(hour=17),
 Row(hour=9),
 Row(hour=4),
 Row(hour=8),
 Row(hour=23),
 Row(hour=7),
 Row(hour=10),
 Row(hour=21),
 Row(hour=11),
 Row(hour=14),
 Row(hour=2),
 Row(hour=0),
 Row(hour=18)]

In [18]:
from pyspark.sql import functions as F

bike_status_period = bike_status_df.withColumn('day_part',
                                               F.when((bike_status_df["hour"] >= 20) | (bike_status_df["hour"] < 6), 'night').\
                                               when((bike_status_df["hour"] >= 6) & (bike_status_df["hour"] < 12), 'morning').\
                                               when((bike_status_df["hour"] >= 12) & (bike_status_df["hour"] < 16), 'afternoon').\
                                               otherwise('evening'))

bike_status_period = bike_status_period.withColumn('isWeekday',
                                                   F.when(bike_status_period["day_of_week"] <= 5, 1).otherwise(0)).cache()
bike_status_period.show(5)

+----------+-------------------------+----+-----+---+----+------+-----------+---------+---------+
|station_id|bikes_utilised_percentage|year|month|day|hour|minute|day_of_week| day_part|isWeekday|
+----------+-------------------------+----+-----+---+----+------+-----------+---------+---------+
|         2|                0.9259259|2013|    8| 29|  12|     6|          4|afternoon|        1|
|         2|                0.9259259|2013|    8| 29|  12|     7|          4|afternoon|        1|
|         2|                0.9259259|2013|    8| 29|  12|     8|          4|afternoon|        1|
|         2|                0.9259259|2013|    8| 29|  12|     9|          4|afternoon|        1|
|         2|                0.9259259|2013|    8| 29|  12|    10|          4|afternoon|        1|
+----------+-------------------------+----+-----+---+----+------+-----------+---------+---------+
only showing top 5 rows



In [19]:
bike_status_period.select('day_part').distinct().collect()

[Row(day_part=u'afternoon'),
 Row(day_part=u'night'),
 Row(day_part=u'morning'),
 Row(day_part=u'evening')]

In [20]:
bike_status_period.count()

16994602

In [21]:
bike_status_period = bike_status_period.drop('year', 'hour', 'minute', 'day', 'isWeekday').cache()
bike_status_period.count()

16994602

In [23]:
bike_status_final = bike_status_period.groupBy('station_id', 'month', 'day_part', 'day_of_week')\
                        .agg(F.min('bikes_utilised_percentage').alias('min_util_perc'),
                             F.max('bikes_utilised_percentage').alias('max_util_perc'),
                             F.variance('bikes_utilised_percentage').alias('var_util_perc'),
                             F.mean('bikes_utilised_percentage').alias('avg_util_perc'))
bike_status_final.show(5)

+----------+-----+--------+-----------+-------------+-------------+--------------------+-------------------+
|station_id|month|day_part|day_of_week|min_util_perc|max_util_perc|       var_util_perc|      avg_util_perc|
+----------+-----+--------+-----------+-------------+-------------+--------------------+-------------------+
|         5|   10| evening|          4|   0.10526316|   0.68421054|0.007285435119161...| 0.5196670549274289|
|         6|   12| morning|          6|   0.46666667|   0.73333335|0.004485562673316062| 0.6242195955995057|
|         7|    9| morning|          1|   0.46666667|    0.6666667|0.005159173545484629| 0.5324195619312885|
|         8|   12| morning|          2|          0.2|          0.6|0.011087554853032108|0.44103705240620505|
|        10|    9| morning|          6|   0.33333334|          0.8|0.018026613717539944| 0.5929139699094639|
+----------+-----+--------+-----------+-------------+-------------+--------------------+-------------------+
only showing top 5 

In [41]:
bike_status_final = bike_status_period.groupBy('station_id', 'month', 'day_part', 'day_of_week')\
                        .agg(F.variance('bikes_utilised_percentage').alias('var_util_perc'),
                             F.mean('bikes_utilised_percentage').alias('avg_util_perc'))
bike_status_final.show(5)

+----------+-----+--------+-----------+--------------------+-------------------+
|station_id|month|day_part|day_of_week|       var_util_perc|      avg_util_perc|
+----------+-----+--------+-----------+--------------------+-------------------+
|         5|   10| evening|          4|0.007285435119161...| 0.5196670549274289|
|         6|   12| morning|          6|0.004485562673316062| 0.6242195955995057|
|         7|    9| morning|          1|0.005159173545484629| 0.5324195619312885|
|         8|   12| morning|          2|0.011087554853032108|0.44103705240620505|
|        10|    9| morning|          6|0.018026613717539944| 0.5929139699094639|
+----------+-----+--------+-----------+--------------------+-------------------+
only showing top 5 rows



In [42]:
bike_status_final.cache()
bike_status_final.count()

11720

In [43]:
!head -5 {INPUT_DATA}station.csv

id,name,lat,long,dock_count,city,installation_date
2,San Jose Diridon Caltrain Station,37.329732,-121.90178200000001,27,San Jose,8/6/2013
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013


In [96]:
# bike_status_updated = bike_status_final.withColumn('station_id',
#     F.when(bike_status_final['station_id'] == 80, 15).
#     when(bike_status_final['station_id'] == 82, 78).
#     when(bike_status_final['station_id'] == 83, 20).
#     when(bike_status_final['station_id'] == 84, 17).
#     otherwise(bike_status_final['station_id']))

# bike_status_updated.show(5)

+----------+-----+--------+-----------+--------------------+-------------------+
|station_id|month|day_part|day_of_week|       var_util_perc|      avg_util_perc|
+----------+-----+--------+-----------+--------------------+-------------------+
|         5|   10| evening|          4|0.007285435119161...| 0.5196670549274289|
|         6|   12| morning|          6|0.004485562673316062| 0.6242195955995057|
|         7|    9| morning|          1|0.005159173545484629| 0.5324195619312885|
|         8|   12| morning|          2|0.011087554853032108|0.44103705240620505|
|        10|    9| morning|          6|0.018026613717539944| 0.5929139699094639|
+----------+-----+--------+-----------+--------------------+-------------------+
only showing top 5 rows



In [44]:
# daily_avg = bike_status_period.select('station_id', 'day_part', 'isWeekday', 'bikes_utilised_percentage').\
#                                    groupBy('station_id', 'day_part', 'isWeekday').\
#     mean('bikes_utilised_percentage').cache()

# daily_avg.show(5)
# daily_avg = daily_avg.withColumnRenamed('avg(bikes_utilised_percentage)', 'avg_bike_util')

In [92]:
# station = sc.textFile(INPUT_DATA + 'station.csv')
# header = station.first()
# station = station.filter(lambda x: x != header).map(lambda x: x.split(',')).map(lambda x:
#                                                                                 (int(x[0]), float(x[2]),float(x[3])))

# station_schema = StructType([StructField('station_id', IntegerType(), False),
#                             StructField('latitude', DoubleType(), False),
#                             StructField('longitude', DoubleType(), False)])

# station_df = sqlContext.createDataFrame(station, station_schema)

# bike_combined = bike_status_final.join(station_df, on='station_id')
# bike_combined.show(5)

# bike_combined = bike_combined.drop('station_id') 
# bike_combined.cache()
# bike_combined.count()

# bike_combined.show(5)

# bike_combined.toPandas().to_csv(INPUT_DATA + 'bike_aggregated_data.csv', index = False)

# !head -5 {INPUT_DATA}bike_aggregated_data.csv

In [356]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

def implement_string_indexer(cols, df):
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+'_si')
        sm = si.fit(df)
        df = sm.transform(df).drop(c)
        df = df.withColumnRenamed(c + '_si', c)
        return df

cols = ['day_part']

final_df = implement_string_indexer(cols, bike_status_final)

In [357]:
final_df.cache()
final_df.count()
final_df.show(5)

+----------+-----+-----------+--------------------+-------------------+--------+
|station_id|month|day_of_week|       var_util_perc|      avg_util_perc|day_part|
+----------+-----+-----------+--------------------+-------------------+--------+
|         5|   10|          4|0.007285435119161...| 0.5196670549274289|     2.0|
|         6|   12|          6|0.004485562673316062| 0.6242195955995057|     3.0|
|         7|    9|          1|0.005159173545484629| 0.5324195619312885|     3.0|
|         8|   12|          2|0.011087554853032108|0.44103705240620505|     3.0|
|        10|    9|          6|0.018026613717539944| 0.5929139699094639|     3.0|
+----------+-----+-----------+--------------------+-------------------+--------+
only showing top 5 rows



In [358]:
station_ids = final_df.select('station_id').collect()
station_ids = [x.asDict()['station_id'] for x in station_ids]
df_without_sid = final_df.drop('station_id')

In [359]:
# input_cols = ['day_of_week', 'avg_util_perc', 'month', 
#               'day_part', 'latitude', 'longitude',
#               'min_util_perc', 'max_util_perc', 'var_util_perc']

input_cols = ['day_of_week', 'avg_util_perc', 'month', 
              'day_part', 'var_util_perc']

va = VectorAssembler(inputCols= input_cols, outputCol= 'features')
df_transformed = va.transform(df_without_sid).select('features')
df_transformed.show(5)

+--------------------+
|            features|
+--------------------+
|[4.0,0.5196670549...|
|[6.0,0.6242195955...|
|[1.0,0.5324195619...|
|[2.0,0.4410370524...|
|[6.0,0.5929139699...|
+--------------------+
only showing top 5 rows



In [360]:
df_transformed.cache()
df_transformed.count()

11720

In [361]:
# scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
#                         withStd=True, withMean=True)

# # Compute summary statistics by fitting the StandardScaler
# scalerModel = scaler.fit(df_transformed)

# # Normalize each feature to have unit standard deviation.
# scaledData = scalerModel.transform(df_transformed).select('scaledFeatures').\
#                 withColumnRenamed('scaledFeatures', 'features')
    
# scaledData.cache()
# scaledData.count()

In [362]:
min_wssse = 1e10

for clusters in range(2,20):

    # Trains a k-means model.
    kmeans = KMeans().setK(clusters).setSeed(1)
    model = kmeans.fit(df_transformed)

    # Evaluate clustering by computing Within Set Sum of Squared Errors.
    wssse = model.computeCost(df_transformed)
    if wssse <= min_wssse:
        min_wssse = wssse
        best_n_clusters = clusters

In [363]:
print("Best Within Set Sum of Squared Errors = " + str(min_wssse))
print("Best number of clusters = " + str(best_n_clusters))

kmeans = KMeans().setK(best_n_clusters).setSeed(1)
model = kmeans.fit(df_transformed)

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Best Within Set Sum of Squared Errors = 12538.8344448
Best number of clusters = 19
Cluster Centers: 
[ 6.          0.52275087  8.5         2.5         0.01304861]
[ 1.5         0.50786387  1.50364964  0.5         0.01453814]
[  4.5          0.49685866  11.5          0.5          0.02099614]
[ 6.2         0.51249939  1.50364964  2.6         0.01617827]
[  2.06213592   0.49662004  11.13009709   0.43786408   0.01816505]
[ 1.5         0.50770679  9.42857143  1.71428571  0.02375764]
[ 3.5         0.51419353  1.50364964  0.5         0.01967513]
[  6.5          0.50107223  11.           2.5          0.01742232]
[ 4.          0.51138767  9.38461538  2.46153846  0.02545185]
[ 4.28571429  0.50842867  9.          0.5         0.01719346]
[ 5.16747573  0.51629141  1.66990291  1.16504854  0.01868887]
[ 6.16625917  0.51049487  1.16870416  0.33251834  0.01665369]
[ 7.          0.50773029  2.          0.5         0.01406227]
[  4.5          0.50548268  11.5          2.5          0.02498372]
[  6.5     

In [364]:
prediction = model.transform(df_transformed).select('prediction').collect()
predicted_cluster = [x.asDict()['prediction'] for x in prediction]

In [365]:
from collections import Counter
Counter(predicted_cluster)

Counter({0: 512,
         1: 548,
         2: 512,
         3: 685,
         4: 1030,
         5: 896,
         6: 548,
         7: 768,
         8: 832,
         9: 896,
         10: 412,
         11: 409,
         12: 138,
         13: 512,
         14: 512,
         15: 640,
         16: 548,
         17: 774,
         18: 548})

In [366]:
station_id_rd =  sc.parallelize(zip(station_ids, predicted_cluster))

station_id_schema = StructType([StructField('station_id', IntegerType()),
                                StructField('cluster', IntegerType())])

station_id_df = sqlContext.createDataFrame(station_id_rd, station_id_schema)


#import math
#station_final_cluster = station_cluster.withColumn('cluster').drop('avg_cluster')

# station_final_cluster = sqlContext.createDataFrame(station_id_df.toPandas().groupby('station_id').\
#                                                    agg(lambda x:x.value_counts().index[0]).reset_index())

station_final_cluster = sqlContext.createDataFrame(station_id_df.toPandas().groupby('station_id').\
                                                   median().reset_index())

In [367]:
Counter([x.asDict()['cluster'] for x in station_final_cluster.select('cluster').collect()])

Counter({8: 64, 10: 5})

In [368]:
station_final_cluster.filter('cluster == 10').show()

+----------+-------+
|station_id|cluster|
+----------+-------+
|        31|     10|
|        32|     10|
|        80|     10|
|        82|     10|
|        83|     10|
+----------+-------+



In [369]:
##Normaliser example

# y = sc.parallelize(range(1,11)).map(lambda x: (x, x**2))

# y_df = sqlContext.createDataFrame(y, StructType([StructField('num', IntegerType()),
#                                                  StructField('num_sq', IntegerType())]))

# from pyspark.ml.feature import Normalizer

# va_small = VectorAssembler(inputCols=['num', 'num_sq'],
#                            outputCol='features')

# y_df_transformed = va_small.transform(y_df).select('features')
# y_df_transformed.show(5)

# norm = Normalizer(inputCol = 'features', outputCol = 'scaledFeatures')

# # Compute summary statistics by fitting the StandardScaler
# normData = norm.transform(y_df_transformed).select('scaledFeatures').\
#                 withColumnRenamed('scaledFeatures', 'features')

# normData.collect()

# num = y_df.select('num').collect()
# y_df = y_df.drop('num')

# num = [x.asDict()['num'] for x in num]

# y_pd = y_df.toPandas()
# y_pd['num'] = num

# y_df_num = sqlContext.createDataFrame(y_pd)

# y_df_num.printSchema()