In [7]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *

sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [2]:
from datetime import datetime
def strip_time(x):
    x = x.strip("\"")
    try:
        
        return datetime.strptime(x,'%Y-%m-%d %H:%M:%S')
    except:
        return None

In [3]:
sc.textFile("../Data/bike_share/status_million.csv").first()

                                                                                

'10,7,8,"2014-12-30 15:37:02"'

In [4]:
status = sc.textFile("../Data/bike_share/status_million.csv")\
            .map(lambda x: x.split(','))\
            .map(lambda x: [int(x[0]),int(x[1]),
                            int(x[2]),strip_time(x[-1])])
status.first()

[10, 7, 8, datetime.datetime(2014, 12, 30, 15, 37, 2)]

## schema : station_id (int), num_bikes_available (int), num_docks_available (int), timestamp(timestamp)

In [5]:
schema = StructType([StructField('station_id',IntegerType(),False),
                    StructField('num_bikes_available',IntegerType(), True),
                    StructField('num_docks_available',IntegerType(),True),
                    StructField('timestamp',TimestampType(),True)])
df = ss.createDataFrame(status,schema)

In [6]:
df.show(5)

                                                                                

+----------+-------------------+-------------------+-------------------+
|station_id|num_bikes_available|num_docks_available|          timestamp|
+----------+-------------------+-------------------+-------------------+
|        10|                  7|                  8|2014-12-30 15:37:02|
|        10|                  7|                  8|2014-12-30 15:35:02|
|        10|                  7|                  8|2014-12-30 15:34:02|
|        10|                  7|                  8|2014-12-30 15:33:02|
|        10|                  7|                  8|2014-12-30 15:32:02|
+----------+-------------------+-------------------+-------------------+
only showing top 5 rows



In [11]:
df.where("station_id==10")\
    .select('station_id','timestamp','num_bikes_available',
            lag('num_bikes_available',1).over(Window.partitionBy().orderBy('timestamp')).alias('prev_num_bikes_available'))\
    .show(5)

22/02/22 00:21:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/22 00:21:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/22 00:22:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------------------+-------------------+------------------------+
|station_id|          timestamp|num_bikes_available|prev_num_bikes_available|
+----------+-------------------+-------------------+------------------------+
|        10|2014-09-01 00:00:03|                  9|                    null|
|        10|2014-09-01 00:01:02|                  9|                       9|
|        10|2014-09-01 00:02:02|                  9|                       9|
|        10|2014-09-01 00:03:03|                  9|                       9|
|        10|2014-09-01 00:04:02|                  9|                       9|
+----------+-------------------+-------------------+------------------------+
only showing top 5 rows



                                                                                

In [12]:
df.where("station_id==10")\
.select('station_id','timestamp','num_bikes_available',
       lead('num_bikes_available',1).over(Window.partitionBy().orderBy('timestamp')).alias('next_num_bikes_available'))\
.show(5)

22/02/22 00:26:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/22 00:26:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/22 00:27:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-------------------+-------------------+------------------------+
|station_id|          timestamp|num_bikes_available|next_num_bikes_available|
+----------+-------------------+-------------------+------------------------+
|        10|2014-09-01 00:00:03|                  9|                       9|
|        10|2014-09-01 00:01:02|                  9|                       9|
|        10|2014-09-01 00:02:02|                  9|                       9|
|        10|2014-09-01 00:03:03|                  9|                       9|
|        10|2014-09-01 00:04:02|                  9|                       9|
+----------+-------------------+-------------------+------------------------+
only showing top 5 rows



                                                                                

In [16]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
df.filter("station_id==10")\
    .select('station_id','timestamp','num_bikes_available',\
lag('num_bikes_available',1).over(Window.partitionBy('station_id').orderBy('timestamp')).alias('prev_num_bikes_available'))\
.show()



+----------+-------------------+-------------------+------------------------+
|station_id|          timestamp|num_bikes_available|prev_num_bikes_available|
+----------+-------------------+-------------------+------------------------+
|        10|2014-09-01 00:00:03|                  9|                    null|
|        10|2014-09-01 00:01:02|                  9|                       9|
|        10|2014-09-01 00:02:02|                  9|                       9|
|        10|2014-09-01 00:03:03|                  9|                       9|
|        10|2014-09-01 00:04:02|                  9|                       9|
|        10|2014-09-01 00:05:02|                  9|                       9|
|        10|2014-09-01 00:06:02|                  9|                       9|
|        10|2014-09-01 00:07:02|                  9|                       9|
|        10|2014-09-01 00:08:02|                  9|                       9|
|        10|2014-09-01 00:09:03|                  9|            

                                                                                

In [None]:
from datetime import datetime
def toIntSafe(inval):
    try:
        return int(inval)
    except:
        return None

def toTimeSafe(inval):
    inval = inval.strip("\"") # Timestamp starting and ending with a double quotation mark.
    try:
        return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S")
    except:
        return None

In [None]:
schema = StructType([ StructField("station_id", IntegerType(), False),
                      StructField("num_bikes_available", IntegerType(), True),
                      StructField("num_docks_available", IntegerType(), True),
                      StructField("timestamp", TimestampType(), True)
                    ])

In [None]:
status_transformed = status.map(lambda x : x.split(","))\
                           .map(lambda x : (int(x[0]), toIntSafe(x[1]), toIntSafe(x[2]), toTimeSafe(x[3])))

In [None]:
status_df = ss.createDataFrame(status_transformed, schema)

In [None]:
status_df.show(5)

In [None]:
status_df.filter('station_id == 10').show()

## Return the current and previous number of bike at station_id, 10 at each time data was collected order by timestamp.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [22]:
df.filter("station_id==10")\
    .select('station_id','timestamp','num_bikes_available',
    lead('num_bikes_available',1).over(Window.partitionBy('station_id').orderBy('timestamp')).alias('next_num_bikes_available'))\
    .show(5)



+----------+-------------------+-------------------+------------------------+
|station_id|          timestamp|num_bikes_available|next_num_bikes_available|
+----------+-------------------+-------------------+------------------------+
|        10|2014-09-01 00:00:03|                  9|                       9|
|        10|2014-09-01 00:01:02|                  9|                       9|
|        10|2014-09-01 00:02:02|                  9|                       9|
|        10|2014-09-01 00:03:03|                  9|                       9|
|        10|2014-09-01 00:04:02|                  9|                       9|
+----------+-------------------+-------------------+------------------------+
only showing top 5 rows



                                                                                

In [13]:
sc.stop()