In [1]:
from pyspark.sql.functions import year
from pyspark.sql.functions import month
from pyspark.sql.functions import dayofyear
from pyspark.sql.functions import dayofweek
from pyspark.sql.functions import dayofmonth
from pyspark.sql.functions import weekofyear
from pyspark.sql.functions import hour
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [16]:
df = spark.read.csv('/Users/shuaishao/udacity/bike-share-data-engineering/dataset/weather_data/*.csv', header=True)

In [28]:
df.select(['date',
							 'temperature',
							 'temperature_min',
							 'temperature_max',
							 'precipitation',
							 'winddirection',
							 'windspeed',
							 'pressure',
							 'station'])

DataFrame[date: string, temperature: string, temperature_min: string, temperature_max: string, precipitation: string, winddirection: string, windspeed: string, pressure: string, station: string]

In [14]:
df = df.select(
    col('short_name').alias('station_short_name'),
    col('external_id').alias('station_external_id'),
    col('has_kiosk').alias('station_has_kiosk'),
    col('rental_methods').alias('station_rental_methods'),
    col('capacity').alias('station_capacity'),
    col('region_id').alias('station_region_id'),
    col('station_type').alias('station_type'),
    col('name').alias('station_name'),
    col('lon').alias('station_longitude'),
    col('station_id').alias('station_id'),
    col('lat').alias('station_latitude'),
    col('weather_station_id')
    
)

In [15]:
df.show(5)

+------------------+--------------------+-----------------+----------------------+----------------+-----------------+------------+--------------------+-------------------+----------+-----------------+------------------+
|station_short_name| station_external_id|station_has_kiosk|station_rental_methods|station_capacity|station_region_id|station_type|        station_name|  station_longitude|station_id| station_latitude|weather_station_id|
+------------------+--------------------+-----------------+----------------------+----------------+-----------------+------------+--------------------+-------------------+----------+-----------------+------------------+
|            SF-G27|1b13a386-c5f4-42c...|             True|  ['KEY', 'CREDITCA...|              35|              3.0|     classic|Powell St BART St...|-122.40490436553955|         3|37.78637526861584|             74506|
|            SF-G26|a00d04e6-0159-466...|             True|  ['KEY', 'CREDITCA...|              35|              3.0|   

In [8]:
df = df.select(['short_name', 'external_id', 'has_kiosk', 'rental_methods', 'capacity', 'region_id', 'station_type', 'name', 'lon', 'station_id', 'lat', 'weather_station_id'])

In [12]:
df.show(2)

+----------+--------------------+---------+--------------------+--------+---------+------------+--------------------+-------------------+----------+-----------------+------------------+
|short_name|         external_id|has_kiosk|      rental_methods|capacity|region_id|station_type|                name|                lon|station_id|              lat|weather_station_id|
+----------+--------------------+---------+--------------------+--------+---------+------------+--------------------+-------------------+----------+-----------------+------------------+
|    SF-G27|1b13a386-c5f4-42c...|     True|['KEY', 'CREDITCA...|      35|      3.0|     classic|Powell St BART St...|-122.40490436553955|         3|37.78637526861584|             74506|
|    SF-G26|a00d04e6-0159-466...|     True|['KEY', 'CREDITCA...|      35|      3.0|     classic|Cyril Magnin St a...| -122.4089150084319|         4|37.78588062694133|             74506|
+----------+--------------------+---------+--------------------+------

In [45]:
df = df.where(col("start_station_id").isNotNull()) \
     .where(col("end_station_id").isNotNull()) \
     .select(['duration_sec', 'start_time', 'end_time', 'start_station_id', 'end_station_id', 'bike_id', 'user_type', 'rental_access_method'])

In [86]:
df.show(10)

+------------+--------------------+--------------------+----------------+--------------+-------+---------+--------------------+
|duration_sec|          start_time|            end_time|start_station_id|end_station_id|bike_id|user_type|rental_access_method|
+------------+--------------------+--------------------+----------------+--------------+-------+---------+--------------------+
|       62083|2020-02-29 18:32:...|2020-03-01 11:47:...|             176|           267|   1993| Customer|                null|
|        1364|2020-02-29 23:53:...|2020-03-01 00:16:...|             375|           506|  12402| Customer|                null|
|         836|2020-02-29 23:54:...|2020-03-01 00:07:...|             375|           454|   2302| Customer|                null|
|        1004|2020-02-29 23:48:...|2020-03-01 00:05:...|             179|           200|    902| Customer|                null|
|        1007|2020-02-29 23:48:...|2020-03-01 00:05:...|             179|           200|  11578| Custome

In [55]:
# Create start_time_table
start_time_table = df.select(["start_time"]) \
            .withColumn("start_year", year("start_time")) \
            .withColumn("start_month", month("start_time")) \
            .withColumn("start_dayofyear", dayofyear("start_time")) \
            .withColumn("start_dayofmonth", dayofmonth("start_time")) \
            .withColumn("start_week", weekofyear("start_time")) \
            .withColumn("start_dayofweek", dayofweek("start_time")) \
            .withColumn("start_hour", hour("start_time"))

In [72]:
start_time_table.show(5)

+--------------------+----------+-----------+---------------+----------------+----------+---------------+----------+
|          start_time|start_year|start_month|start_dayofyear|start_dayofmonth|start_week|start_dayofweek|start_hour|
+--------------------+----------+-----------+---------------+----------------+----------+---------------+----------+
|2020-02-29 18:32:...|      2020|          2|             60|              29|         9|              7|        18|
|2020-02-29 23:53:...|      2020|          2|             60|              29|         9|              7|        23|
|2020-02-29 23:54:...|      2020|          2|             60|              29|         9|              7|        23|
|2020-02-29 23:48:...|      2020|          2|             60|              29|         9|              7|        23|
|2020-02-29 23:48:...|      2020|          2|             60|              29|         9|              7|        23|
+--------------------+----------+-----------+---------------+---

In [58]:
# Create end_time_table
end_time_table = df.select(["end_time"]) \
            .withColumn("end_year", year("end_time")) \
            .withColumn("end_month", month("end_time")) \
            .withColumn("end_dayofyear", dayofyear("end_time")) \
            .withColumn("end_dayofmonth", dayofmonth("end_time")) \
            .withColumn("end_week", weekofyear("end_time")) \
            .withColumn("end_dayofweek", dayofweek("end_time")) \
            .withColumn("end_hour", hour("end_time"))

In [73]:
end_time_table.show(5)

+--------------------+--------+---------+-------------+--------------+--------+-------------+--------+
|            end_time|end_year|end_month|end_dayofyear|end_dayofmonth|end_week|end_dayofweek|end_hour|
+--------------------+--------+---------+-------------+--------------+--------+-------------+--------+
|2020-03-01 11:47:...|    2020|        3|           61|             1|       9|            1|      11|
|2020-03-01 00:16:...|    2020|        3|           61|             1|       9|            1|       0|
|2020-03-01 00:07:...|    2020|        3|           61|             1|       9|            1|       0|
|2020-03-01 00:05:...|    2020|        3|           61|             1|       9|            1|       0|
|2020-03-01 00:05:...|    2020|        3|           61|             1|       9|            1|       0|
+--------------------+--------+---------+-------------+--------------+--------+-------------+--------+
only showing top 5 rows



In [84]:
# Write trip talbe
df.write.partitionBy(['start_station_id']).parquet(os.path.join(output_data, 'trips'), 'overwrite')

In [80]:
# Write start time table
start_time_table.write.partitionBy(['start_year', 'start_month']).parquet(os.path.join(output_data, 'start_time'), 'overwrite')

+------------+--------------------+--------------------+----------------+--------------+-------+---------+--------------------+
|duration_sec|          start_time|            end_time|start_station_id|end_station_id|bike_id|user_type|rental_access_method|
+------------+--------------------+--------------------+----------------+--------------+-------+---------+--------------------+
|       62083|2020-02-29 18:32:...|2020-03-01 11:47:...|             176|           267|   1993| Customer|                null|
|        1364|2020-02-29 23:53:...|2020-03-01 00:16:...|             375|           506|  12402| Customer|                null|
|         836|2020-02-29 23:54:...|2020-03-01 00:07:...|             375|           454|   2302| Customer|                null|
|        1004|2020-02-29 23:48:...|2020-03-01 00:05:...|             179|           200|    902| Customer|                null|
|        1007|2020-02-29 23:48:...|2020-03-01 00:05:...|             179|           200|  11578| Custome

In [None]:
# Write end time table
end_time_table.write.partitionBy(['end_year', 'end_month']).parquet(os.path.join(output_data, 'end_time'), 'overwrite')

In [88]:
start_time_table.groupby(start_time_table.start_hour) \
.count() \
.sort(col("count").desc()) \
.show(20)

+----------+------+
|start_hour| count|
+----------+------+
|        17|328194|
|         8|312658|
|        18|257674|
|         9|240115|
|        16|211646|
|        19|157631|
|         7|156230|
|        15|136340|
|        12|124093|
|        13|121510|
|        10|121138|
|        14|117731|
|        11|113015|
|        20| 99371|
|        21| 70119|
|         6| 58413|
|        22| 47480|
|        23| 28044|
|         0| 15595|
|         5| 14666|
+----------+------+
only showing top 20 rows



In [71]:
df.sort(col("duration_sec").desc()).show(5)

+------------+--------------------+--------------------+----------------+--------------+-------+----------+--------------------+
|duration_sec|          start_time|            end_time|start_station_id|end_station_id|bike_id| user_type|rental_access_method|
+------------+--------------------+--------------------+----------------+--------------+-------+----------+--------------------+
|        9999|2019-08-16 16:51:...|2019-08-16 19:38:...|             223|           120|   9884|Subscriber|                  No|
|        9999|2019-05-21 13:34:...|2019-05-21 16:20:...|               6|           284|   1191|  Customer|                  No|
|        9997|2019-03-15 21:50:...|2019-03-16 00:37:...|              16|             5|   6187|Subscriber|                  No|
|        9996|2019-10-19 13:57:...|2019-10-19 16:43:...|             441|           400|  12765|  Customer|                  No|
|        9995|2019-12-17 18:35:...|2019-12-17 21:22:...|              38|            23|  11521| 

In [90]:
# Most popular route
df.groupby(df.start_station_id,df.end_station_id) \
.count() \
.sort(col("count").desc()) \
.show(20)

+----------------+--------------+-----+
|start_station_id|end_station_id|count|
+----------------+--------------+-----+
|              81|            15| 5401|
|              15|             6| 5384|
|               6|            16| 4198|
|             196|           182| 4171|
|             343|            21| 4144|
|             182|           196| 3858|
|               6|            15| 3492|
|              15|            81| 3492|
|              16|            81| 3368|
|              50|            21| 3337|
|              58|             3| 3322|
|              21|            30| 3110|
|              81|            22| 3077|
|              16|             6| 3073|
|              50|            15| 2992|
|             296|           280| 2937|
|              30|            21| 2885|
|              58|            21| 2851|
|              22|            30| 2799|
|             195|           182| 2724|
+----------------+--------------+-----+
only showing top 20 rows



In [15]:
df.where(col("rental_access_method") != "null").show(5)

+------------+-------------------+-------------------+----------------+--------------+-------+----------+--------------------+
|duration_sec|         start_time|           end_time|start_station_id|end_station_id|bike_id| user_type|rental_access_method|
+------------+-------------------+-------------------+----------------+--------------+-------+----------+--------------------+
|         298|2020-02-01 20:14:08|2020-02-01 20:19:06|             473|           415| 885034|Subscriber|                 app|
|         393|2020-02-03 18:33:39|2020-02-03 18:40:12|             473|           415| 470367|Subscriber|                 app|
|         284|2020-02-16 19:45:10|2020-02-16 19:49:54|             473|           415| 530223|Subscriber|                 app|
|         296|2020-02-08 20:40:25|2020-02-08 20:45:22|             473|           415| 814402|Subscriber|                 app|
|         799|2020-02-04 11:43:38|2020-02-04 11:56:57|             425|           425| 100235|  Customer|      

In [39]:
start_station_id_null = df.where(col("start_station_id").isNull())

In [41]:
end_station_id_null = df.where(col("end_station_id").isNull())

In [49]:
start_station_null_unique = start_station_id_null \
                            .groupby(start_station_id_null.start_station_latitude, start_station_id_null.start_station_longitude) \
                            .count() \
                            .sort(col("count").desc()) \
                            .show(20)

+----------------------+-----------------------+-----+
|start_station_latitude|start_station_longitude|count|
+----------------------+-----------------------+-----+
|             37.776018|        -122.3923376667|   11|
|         37.7611943333|        -122.4126463333|    3|
|         37.7771103333|        -122.4176393333|    2|
|            37.7338085|        -122.3907331667|    2|
|         37.3404381667|        -121.8942118333|    2|
|         37.7579751667|        -122.3887468333|    2|
|            37.7508025|        -122.4163483333|    2|
|            37.7537925|            -122.421005|    2|
|         37.7569271667|        -122.4085536667|    2|
|             37.771127|        -122.4081616667|    2|
|         37.7609678333|        -122.4326971667|    2|
|            37.7617205|        -122.4351831667|    2|
|         37.7620843333|        -122.3897693333|    2|
|         37.7632323333|        -122.4217113333|    2|
|         37.7635276667|        -122.4179188333|    2|
|         

In [48]:
start_station_id_null.count()

566291

In [25]:
df = df['duration_sec', 'start_time', 'end_time', 'start_station_id', 'end_station_id', 'bike_id', 'user_type', 'rental_access_method'] \
    .distinct()

In [92]:
data = spark.read.csv('/Users/shuaishao/udacity/bike-share-data-engineering/dataset/trip_data/201901-fordgobike-tripdata.csv', header=True)

In [93]:
data.columns

['duration_sec',
 'start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'bike_share_for_all_trip']

In [94]:
data.show(5)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------------+
|duration_sec|          start_time|            end_time|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|bike_share_for_all_trip|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------------+
|       80825|2019-01-31 17:57:...|2019-02-01 16:24:...|             229|Foothill Blvd at ...|            37.7757452|           -122.2130372|           196|Grand Ave at Perk...|        37.808893934|      -122