**The original raw dataset is over 3GB, excess GitHub limit, this only can process a subset sample dataset

In [1]:
# import pyspark
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DP_bicycle').getOrCreate()

In [2]:
# Importing data which has a header. Schema is automatically configured.
df_raw = spark.read.csv('Datasets/London.csv', header=True, inferSchema=True)

In [3]:
from pyspark.sql.functions import *

*data cleaning (3.2)

In [4]:
# Row count before clean
rowCount_before = df_raw.count()
# Remove all rows has null value
df_raw = df_raw.na.drop()
# Remove extrame value
# We only consider normal commuting to work, choose less than 2 hour (7200 seconds)
df_raw = df_raw.filter(col("duration")<="7200")
rowCount_after = df_raw.count()
# Clean result
print("Removed " + str(rowCount_before-rowCount_after) + " from raw dataset, " + str(rowCount_after) + " after Clean.")

Removed 397075 from raw dataset, 30784706 after Clean.


*Data construction (3.3)
*Add Date, Weekday, Hour columns from start_rental_date_time

In [17]:
df_raw = df_raw.withColumn('date', to_date("start_rental_date_time"))\
                           .withColumn('Time_Hour', hour("start_rental_date_time"))\
                           .withColumn('DayOfWeek', date_format("start_rental_date_time", 'EEEE'))
df_raw.printSchema()

root
 |-- rental_id: integer (nullable = true)
 |-- duration: double (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- end_rental_date_time: timestamp (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_rental_date_time: timestamp (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- date: date (nullable = true)
 |-- Time_Hour: integer (nullable = true)
 |-- DayOfWeek: string (nullable = true)



*We create a new dataset (df1) including communiting transactions between St.Peter's Terrace and Parsons Green Station
*df2 is the people start to ride from St.Peter's Terrace

In [27]:
df1 = df_raw.filter((col("start_station_id") == 729) & (col("end_station_id") == 671)\
              | (col("start_station_id") == 671) & (col("end_station_id") == 729) )

In [31]:
df2 = df1.filter(col("start_station_id") == 729)

hourly statistic 

In [33]:
df2.withColumn("int_week", dayofweek("start_rental_date_time"))\
        .groupBy("int_week","DayOfWeek","Time_Hour")\
        .agg(count("duration").alias("count"), avg("duration").alias("avg_duration"))\
        .filter(col("int_week") ==3)\
        .orderBy("int_week","Time_Hour")\
        .show()
        

+--------+---------+---------+-----+------------------+
|int_week|DayOfWeek|Time_Hour|count|      avg_duration|
+--------+---------+---------+-----+------------------+
|       3|  Tuesday|        0|    1|             240.0|
|       3|  Tuesday|        5|   10|             172.1|
|       3|  Tuesday|        6|  110|199.96363636363637|
|       3|  Tuesday|        7|  488|202.53483606557376|
|       3|  Tuesday|        8|  243|215.00411522633746|
|       3|  Tuesday|        9|   73|198.68493150684932|
|       3|  Tuesday|       10|   28|199.28571428571428|
|       3|  Tuesday|       11|   29| 397.2413793103448|
|       3|  Tuesday|       12|   25|             211.2|
|       3|  Tuesday|       13|   17| 204.7058823529412|
|       3|  Tuesday|       14|   12|             285.0|
|       3|  Tuesday|       15|   22|215.45454545454547|
|       3|  Tuesday|       16|   70|206.57142857142858|
|       3|  Tuesday|       17|  194| 219.2577319587629|
|       3|  Tuesday|       18|  163|233.68098159

*Data Integration (3.4)
*Load weather data and join with bicycles data by column 'date'

In [34]:
df_weather = spark.read.csv('Datasets/London_weather.csv', header=True, inferSchema=True)
df_weather.printSchema()

root
 |-- datetime: date (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipcover: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- day_length: double (nullable = true)



Convert to statistical dataset and merge with weather (3.5)

In [46]:
df_stat = df1.groupBy("date", "DayOfWeek")\
            .agg(count("duration").alias("count"), avg("duration").alias("avg_duration"))\
            
            
df_join = df_stat.join(df_weather, df1.date == df_weather.datetime, "inner")
df_join.orderBy("date").show()

+----------+---------+-----+------------------+----------+---------+------+-----------+---------+----------+
|      date|DayOfWeek|count|      avg_duration|  datetime|feelslike|precip|precipcover|windspeed|day_length|
+----------+---------+-----+------------------+----------+---------+------+-----------+---------+----------+
|2017-01-01|   Sunday|    2|             180.0|2017-01-01|      3.8| 8.976|       12.5|     20.3|     7.936|
|2017-01-02|   Monday|    7|171.42857142857142|2017-01-02|      0.9|   2.0|       4.17|     15.4|     7.956|
|2017-01-03|  Tuesday|   13|203.07692307692307|2017-01-03|     -1.0| 0.001|       4.17|     20.2|     7.978|
|2017-01-04|Wednesday|   14|188.57142857142858|2017-01-04|      2.5|   0.0|        0.0|     17.6|     8.002|
|2017-01-05| Thursday|   14|162.85714285714286|2017-01-05|      2.2|   0.0|        0.0|      7.4|     8.027|
|2017-01-06|   Friday|   10|             198.0|2017-01-06|      1.8| 1.002|       8.33|     16.4|     8.054|
|2017-01-07| Saturd

In [42]:
df_stat.printSchema()
df_stat.show()

root
 |-- date: date (nullable = true)
 |-- count: long (nullable = false)
 |-- avg_duration: double (nullable = true)

+----------+-----+------------------+
|      date|count|      avg_duration|
+----------+-----+------------------+
|2016-12-28|    2|             180.0|
|2016-12-30|    1|             240.0|
|2016-12-31|    1|             120.0|
|2017-01-01|    2|             180.0|
|2017-01-02|    7|171.42857142857142|
|2017-01-03|   13|203.07692307692307|
|2017-01-04|   14|188.57142857142858|
|2017-01-05|   14|162.85714285714286|
|2017-01-06|   10|             198.0|
|2017-01-07|    5|             168.0|
|2017-01-08|    2|             330.0|
|2017-01-09|   14|205.71428571428572|
|2017-01-10|   21|211.42857142857142|
|2017-01-11|   23|190.43478260869566|
|2017-01-12|   12|             200.0|
|2017-01-13|   10|             276.0|
|2017-01-14|   11|223.63636363636363|
|2017-01-15|    5|             216.0|
|2017-01-16|   22|196.36363636363637|
|2017-01-17|   32|           226.875|
+-----

In [47]:
df_join.count()

1084

**Data transformation 

Remove weekend and some holidays (4.1)

In [50]:
df_join = df_join.filter(df_join.DayOfWeek != 'Sunday') \
                    .filter(df_join.DayOfWeek != 'Saturday')

In [53]:
df_join = df_join.filter(df_join.date != '2017-01-01') \
                    .filter(df_join.date != '2017-01-02') \
                    .filter(df_join.date != '2017-12-25') \
                    .filter(df_join.date != '2017-12-26') \
                    .filter(df_join.date != '2018-01-01') \
                    .filter(df_join.date != '2018-01-02') \
                    .filter(df_join.date != '2018-12-25') \
                    .filter(df_join.date != '2018-12-26') \
                    .filter(df_join.date != '2019-01-01') \
                    .filter(df_join.date != '2019-01-02') \
                    .filter(df_join.date != '2019-12-25') \
                    .filter(df_join.date != '2019-12-26')
df_join.count()

767

*logarithm (4.2)

In [55]:
df_join = df_join.withColumn("lg_day_length", log10(col("day_length"))) \
                .withColumn("lg_windspeed", log10(col("windspeed")))


In [59]:
df_join.printSchema()

root
 |-- date: date (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- count: long (nullable = false)
 |-- avg_duration: double (nullable = true)
 |-- datetime: date (nullable = true)
 |-- feelslike: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- precipcover: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- day_length: double (nullable = true)
 |-- lg_day_length: double (nullable = true)
 |-- lg_windspeed: double (nullable = true)

