In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *
import datetime
from pyspark.sql import SQLContext

In [8]:
#Following shows the original data table we have.
trips=spark.read.csv("/zhaoxiangyu/DIvvy/Divvy_Trips_2017_Q1.csv",header=True,inferSchema=True)
trips.show()

+--------+------------------+------------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
| trip_id|        start_time|          end_time|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype|gender|birthyear|
+--------+------------------+------------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|13518905|3/31/2017 23:59:07| 4/1/2017 00:13:24|  5292|         857|             66|Clinton St & Lake St|          171|May St & Cullerto...|Subscriber|  Male|     1989|
|13518904|3/31/2017 23:56:25| 4/1/2017 00:00:21|  4408|         236|            199|Wabash Ave & Gran...|           26|McClurg Ct & Illi...|Subscriber|  Male|     1990|
|13518903|3/31/2017 23:55:33| 4/1/2017 00:01:21|   696|         348|            520|Greenview Ave & J...|          432| Clark St & Lunt Ave|Subscriber|Fema

In [4]:
def run(file):
    #Key step of data transformation is to count the number of entries for each staion grouped by same time period 
    trips=spark.read.csv("~/DIvvy/"+file+".csv",header=True,inferSchema=True)
    trips.createOrReplaceTempView("trips")
    temp=spark.sql("select trip_id,start_time,end_time,bikeid,tripduration,from_station_name,to_station_name from trips")
    temp.createOrReplaceTempView("temp")
    temp2=temp.rdd.map(lambda row : [row.start_time.split(" ")[0],row.start_time.split(" ")[1],row.end_time.split(" ")[0],row.end_time.split(" ")[1],row.trip_id])
    temp3=temp2.map(lambda row : [row[0].split("/")[0],row[0].split("/")[1],row[0].split("/")[2],row[1].split(":")[0],row[1].split(":")[1],row[2].split("/")[0],row[2].split("/")[1],row[2].split("/")[2],row[3].split(":")[0],row[3].split(":")[1],datetime.date(int(row[0].split("/")[2]), int(row[0].split("/")[0]), int(row[0].split("/")[1])).isocalendar()[1],datetime.date(int(row[0].split("/")[2]), int(row[0].split("/")[0]), int(row[0].split("/")[1])).isocalendar()[2],row[4]])
    
    #This is step for assigning data type for each column.
    time = sqlContext.createDataFrame(temp3).select(col("_1").alias("start_month").cast(IntegerType()), \
            col("_2").alias("start_day").cast(IntegerType()), \
            col("_3").alias("start_year").cast(IntegerType()), \
            col("_4").alias("start_hour").cast(IntegerType()), \
            col("_5").alias("start_min").cast(IntegerType()), \
            col("_6").alias("end_month").cast(IntegerType()), \
            col("_7").alias("end_day").cast(IntegerType()), \
            col("_8").alias("end_year").cast(IntegerType()), \
            col("_9").alias("end_hour").cast(IntegerType()), \
            col("_10").alias("end_min").cast(IntegerType()), \
            col("_11").alias("week").cast(IntegerType()), \
            col("_12").alias("day_of_week").cast(IntegerType()), \
            col("_13").alias("trip_id").cast(IntegerType()), \
           )
    
    temp4=spark.sql("select trip_id,bikeid,tripduration,from_station_name,to_station_name from trips")
    data=time.join(temp4,"trip_id")
    data.createOrReplaceTempView("data")
    data_start_count=data.groupBy(['start_day','start_year','start_hour','week','day_of_week','from_station_name']).count().withColumnRenamed('count','start_count').withColumnRenamed("from_station_name", "station_name")
    data_end_count=data.groupBy(['start_day','start_year','start_hour','week','day_of_week','to_station_name']).count().withColumnRenamed('count','end_count').withColumnRenamed("to_station_name", "station_name")
    
    #Join outflow table and inflow table together, we could calculate the number of total flow for any hour.
    training_data=data_start_count.join(data_end_count,['start_day','start_year','start_hour','week','day_of_week','station_name'])
    training_data.write.option("header", "true").csv("~/DIvvy/training_sets/"+file+"_training.csv")

In [None]:
# run the transformation function for each quarter of data from one year 
run("Divvy_Trips_2017_Q1")

In [22]:
file="Divvy_Trips_2017_Q1"
trips=spark.read.csv("/zhaoxiangyu/DIvvy/training_sets/"+file+"_training.csv",header=True,inferSchema=True)

In [23]:
# This is a demo for generated data.
trips.show()

+---------+----------+----------+----+-----------+--------------------+-----------+---------+
|start_day|start_year|start_hour|week|day_of_week|        station_name|start_count|end_count|
+---------+----------+----------+----+-----------+--------------------+-----------+---------+
|        1|      2017|         7|   5|          3|Clinton St & Wash...|         35|       10|
|        1|      2017|         9|   9|          3|Orleans St & Merc...|          2|        3|
|        1|      2017|        10|  52|          7|Clifton Ave & Arm...|          1|        2|
|        1|      2017|        11|   9|          3| Canal St & Adams St|          2|        1|
|        1|      2017|        14|   9|          3|Green St & Randol...|          1|        1|
|        1|      2017|        16|   5|          3| Canal St & Adams St|          4|       33|
|        1|      2017|        17|   9|          3|  Clark St & Lake St|          6|        1|
|        1|      2017|        17|   9|          3|Clinton St

In [None]:
run("Divvy_Trips_2017_Q3")

In [None]:
run("Divvy_Trips_2017_Q4")

In [13]:
file="Divvy_Trips_2017_Q2"
trips=spark.read.csv("/zhaoxiangyu/DIvvy/training_sets/"+file+"_training.csv",header=True,inferSchema=True)