In [1]:
from pyspark.sql.types import *
from pyspark.sql import functions as psf
import json
from pyspark.sql.streaming import *
from pyspark.sql import *

In [2]:
# Config section:
#----------------
# Streaming Paths:
api_resp_path = '/FileStore/tables/Citi-Bike/api_response'
stream_bronze = '/FileStore/tables/Citi-Bike/Stream_Bronze'
stream_silver = '/FileStore/tables/Citi-Bike/stream_Silver'
checkpointPath = '/FileStore/tables/Citi-Bike/chkpt'
silverChkPtPath = '/FileStore/tables/Citi-Bike/silvr_chkpt'

# Static Paths:
bronze_path = '/FileStore/tables/Citi-Bike/Bronze'
silver_path = '/FileStore/tables/Citi-Bike/Silver'
gold_path = '/FileStore/tables/Citi-Bike/Gold'

# Create folders if they dont exist:
dbutils.fs.mkdirs(api_resp_path)
dbutils.fs.mkdirs(stream_bronze)
dbutils.fs.mkdirs(stream_silver)
dbutils.fs.mkdirs(checkpointPath)
dbutils.fs.mkdirs(silverChkPtPath)

In [3]:
%sh
# Use Shell script type cells in Databricks:
# Show JSON response files present on DBFS Landing (can be S3 or GCS bucket or Az Blob)

ls '/dbfs/FileStore/tables/Citi-Bike/api_response/'
#ls '/dbfs/FileStore/tables/Citi-Bike/slvr_chkpt'

In [4]:
# Delete/Clean DBFS Path RECURSIVELY when needed:
# Free Cluster has 10GB and/or 10K files, storage limit
'''
root_fldr = "/FileStore/tables/Citi-Bike/chkpt"      # "/FileStore/tables/Citi-Bike/api_response"
for fil in dbutils.fs.ls(root_fldr):
  dbutils.fs.rm(fil.path)

dbutils.fs.rm(root_fldr)
'''

# Optionally delete Delta Lake folder:
#dbutils.fs.ls('/FileStore/tables/Citi-Bike/Stream_Silver')
#dbutils.fs.ls('/FileStore/tables/Citi-Bike/slvr_chkpt')

#dbutils.fs.rm('/FileStore/tables/Citi-Bike/Stream_Bronze/_delta_log/00000000000000000204.json')

##### Get the JSON's schema by manually reading one JSON:

In [6]:
# Read a single JSON into DF:
api_respDF = spark.read.json(f"{api_resp_path}/station_status_20200731_13-31-46.json")

# Get the df's Schema as JSON:
schema_json = api_respDF.schema.json()
#print(schema_json)

# Convert schemaJSON back into proper StructType Schema:
schema = StructType.fromJson(json.loads(schema_json))
#print(schema)

##### Streaming Read API Response JSON files - 1 by 1 continuously:

In [8]:
stream_api_respDF = (spark
  .readStream
  .format("json")
  .schema(schema)
  .option("maxFilesPerTrigger", 1)     # Optional; force processing of only 1 file per trigger 
  .load(api_resp_path)
)

#https://docs.databricks.com/getting-started/spark/streaming.html

In [9]:
# OPTIONALLY check the streaming DF content in streaming fashion (display - can show streaming DF content live):
#display(stream_api_respDF)

##### Streaming Write into Streaming BRONZE Delta:
###### Bronze layer has parsed JSONs into Struct Type - NOT Flattened or modified

In [11]:
streamingQuery = (stream_api_respDF                                
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointPath)
  .outputMode("append")
  .trigger(processingTime='30 seconds')
  .queryName("api2bronze_stream")        # optional argument to register stream to Spark catalog
  .start(stream_bronze)                                       
)

In [12]:
# Add a wait so that previous Streaming write finishes
import time
time.sleep(120) # sleep 10 seconds

##### Bronze to Silver TF on Streaming Data:
##### Perform Streaming Read from Streaming Bronze  >> Flatten & Extract needed fields >> Streaming Write to Silver Delta Lake

In [14]:
streamingBronzeDF = spark.readStream.format("delta").load(stream_bronze)
exploded_streamingBronzeDF = streamingBronzeDF.withColumn("expl_data_stations", psf.explode("data.stations"))
#exploded_streamingBronzeDF.printSchema()
exploded_streamingBronzeDF = (exploded_streamingBronzeDF
                              .withColumn("station_id", psf.col("expl_data_stations.station_id"))
                              .withColumn("num_bikes_available", psf.col("expl_data_stations.num_bikes_available"))
                              .withColumn("num_bikes_disabled", psf.col("expl_data_stations.num_bikes_disabled"))
                              .withColumn("num_docks_available", psf.col("expl_data_stations.num_docks_available"))
                              .withColumn("num_docks_disabled", psf.col("expl_data_stations.num_docks_disabled"))
                              .withColumn("num_ebikes_available", psf.col("expl_data_stations.num_ebikes_available"))
                              .withColumn("station_status", psf.col("expl_data_stations.station_status"))
                              .withColumn("is_renting", psf.col("expl_data_stations.is_renting"))
                              .withColumn("is_returning", psf.col("expl_data_stations.is_returning"))
                              .withColumn("last_reported", psf.col("expl_data_stations.last_reported"))
                              .withColumn("last_reported_ts", psf.from_unixtime(psf.col("expl_data_stations.last_reported")).cast("timestamp"))
                              # .groupBy(stream_api_respDF.ttl, psf.window(psf.from_unixtime(stream_api_respDF.last_updated,"MM-dd-yyyy HH:mm:ss"), "1 hour"))
                              # num_docks_disabled   num_ebikes_available  station_id   station_status  is_renting   last_reported   is_returning
                             )
col_lst = ["station_id", "num_bikes_available", "num_bikes_disabled", "num_docks_available", "num_docks_disabled", "num_ebikes_available", "station_status", "is_renting", "is_returning", "last_reported", "last_reported_ts"]
exploded_streamingBronzeDF = exploded_streamingBronzeDF.select(col_lst)

In [15]:
# Display/Show Streaming DF to Verify TFs:
# Works
#display(exploded_streamingBronzeDF)

##### Streaming WRITE into Silver
###### (Flattened by picking relevant fields & exploding arrays, above)

In [17]:
# Works
(exploded_streamingBronzeDF                             
  .writeStream                                                
  .format("delta")                                          
  .option("checkpointLocation", silverChkPtPath)               
  .outputMode("append")
  .trigger(processingTime='30 seconds')
  .queryName("silver_stream_tbl")        # optional argument to register stream to Spark catalog
  .start(stream_silver)                                       
)

##### Streaming READ from SILVER Delta Lake

In [19]:
# WORKS
time.sleep(100)
streaming_silverDF = spark.readStream.format("delta").load(stream_silver)
#display(streaming_silverDF)

In [20]:
# Query the STREAMING DF for Data-analysis: (kept disabled to avoid load on Cluster)
# WORKS  
#display(streaming_silverDF.filter("station_status != 'active'"))

In [21]:
#WORKS
display(
  streaming_silverDF.filter("station_status == 'active'")
  .groupBy()
  .count()
)

count
312933


In [22]:
# Below throws error if prior .format() is NOT memory, has to be:  .format("memory")
#spark.sql("select * from silver_stream").show()
# Error due to NOT MEMORY Mode: org.apache.spark.sql.AnalysisException: Table or view not found: silver_stream; line 1 pos 14
# Refer: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html

In [23]:
'''
query = exploded_streamingBronzeDF.writeStream.format("console").start()
import time
time.sleep(100) # sleep 10 seconds
query.stop()
'''

##### Verify Streaming DFs are writing or not
##### By taking Count in STATIC fashion

In [25]:
# Verify Streaming writes by STATIC Reads:
# Verify BRONZE Layer:
bronze_df = spark.read.format("delta").load(stream_bronze)
#display(bronze_df)
print(bronze_df.count())

In [26]:
# Verify Streaming writes by STATIC Reads:
# Verify SILVER Layer:
slvr_df = spark.read.format("delta").load(stream_silver)
print(slvr_df.count())
#display(slvr_df)

##### Run SQL on DIM_STATION created and registered in Catalog from Batch pipeline Notebook:
###### Make sure the DIM table is registered in Catalog

In [28]:
%sql
select * from CITIBIKE.dim_station
order by station_id

station_id,station_name
83,Atlantic Ave & Fort Greene Pl
143,Clinton St & Joralemon St
173,Broadway & W 49 St
244,Willoughby Ave & Hall St
285,Broadway & E 14 St
308,St James Pl & Oliver St
327,Vesey Pl & River Terrace
476,E 31 St & 3 Ave
480,W 53 St & 10 Ave
489,10 Ave & W 28 St


#### Perform Streaming Top N Analysis - Gold Layer:
* Busiest Station-ID: Stations with most Trips (Complete)
* Stations with Longest Trips
* Stations with Shortest Trips
* Customers by Age group : 30+, 40+, 50+ yrs 
* Stations with Most Subscribers

###### Spark SQL approach:
###### Busiest Station-ID: Stations with Least Free Bikes

In [31]:
#Busiest Station-ID: Stations with most Trips
streaming_silverDF.createOrReplaceTempView("silver_stream")

In [32]:
%sql
select * from silver_stream

station_id,num_bikes_available,num_bikes_disabled,num_docks_available,num_docks_disabled,num_ebikes_available,station_status,is_renting,is_returning,last_reported,last_reported_ts
72,0,51,0,4,0,active,1,1,1595977122,2020-07-28T22:58:42.000+0000
79,30,0,3,0,0,active,1,1,1595976391,2020-07-28T22:46:31.000+0000
82,22,0,5,0,0,active,1,1,1595975802,2020-07-28T22:36:42.000+0000
83,44,2,16,0,0,active,1,1,1595977029,2020-07-28T22:57:09.000+0000
116,8,1,41,0,0,active,1,1,1595977152,2020-07-28T22:59:12.000+0000
119,14,3,2,0,0,active,1,1,1595974894,2020-07-28T22:21:34.000+0000
120,2,2,15,0,0,active,1,1,1595977072,2020-07-28T22:57:52.000+0000
127,29,1,1,0,0,active,1,1,1595976564,2020-07-28T22:49:24.000+0000
128,24,4,2,0,0,active,1,1,1595977219,2020-07-28T23:00:19.000+0000
143,4,0,20,0,0,active,1,1,1595976939,2020-07-28T22:55:39.000+0000


In [33]:
# Old - trying something new in next code-block
#stations_by_cnt = #(use = if we want to define a DF as o/p of Spark SQL)

'''
spark.sql(\'''
SELECT S.station_name, A.*
FROM (
   select station_id, SUM(num_docks_available - num_bikes_available) as Bike_Rental_Cnt
   FROM silver
   GROUP BY station_id
   order by Bike_Rental_Cnt DESC
) A
INNER JOIN CITIBIKE.dim_station S
ON A.station_id = S.station_id
\'''
).createOrReplaceTempView("dm_busiest_stations")
'''

#display(stations_by_cnt) # IF a Dataframe was created, then we have seen Content this way

In [34]:
realtime_busiest_stations = (
  spark.sql('''
  SELECT S.station_name, A.station_id, A.num_bikes_available, A.last_reported_ts
  FROM silver_stream A
  INNER JOIN CITIBIKE.dim_station S
  ON A.station_id = S.station_id
  ''')
)

realtime_busiest_stations.createOrReplaceTempView("realtime_busiest_stations_view")

#print(type(realtime_busiest_stations))
#realtime_busiest_stations = realtime_busiest_stations.withColumn("last_reported_ts", psf.from_unixtime(psf.col("last_reported")).cast("timestamp"))

#display(realtime_busiest_stations)

In [35]:
# Trying WINDOWED Aggregation
aggregatedDF = (
  realtime_busiest_stations
  .groupBy(psf.col("station_id"), psf.window(psf.col("last_reported_ts"), "120 seconds"))
  .agg(psf.avg(psf.col("num_bikes_available")).alias("avg_num_bikes_available"))
  #.select(psf.col("window.start").alias("start"),
  #        psf.col("station_id"),
  #        psf.col("count"))
  #.orderBy(psf.col("start"), psf.col("station_id")) 
  .orderBy(psf.col("window.start"), psf.col("avg_num_bikes_available"), ascending=[0,0])
)

display(aggregatedDF)

station_id,window,avg_num_bikes_available
3186,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",31.0
3629,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",24.0
3276,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",23.0
3256,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",22.0
3638,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",18.0
518,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",16.0
480,"List(2020-07-30T17:34:00.000+0000, 2020-07-30T17:36:00.000+0000)",2.0
525,"List(2020-07-30T17:32:00.000+0000, 2020-07-30T17:34:00.000+0000)",34.0
489,"List(2020-07-30T17:32:00.000+0000, 2020-07-30T17:34:00.000+0000)",33.0
3461,"List(2020-07-30T17:32:00.000+0000, 2020-07-30T17:34:00.000+0000)",27.0


In [36]:
%sql
-- Realtime SQL on RAW 
SELECT * FROM realtime_busiest_stations_view

station_name,station_id,num_bikes_available,last_reported_ts
Atlantic Ave & Fort Greene Pl,83,44,2020-07-28T22:57:09.000+0000
Clinton St & Joralemon St,143,4,2020-07-28T22:55:39.000+0000
Broadway & W 49 St,173,0,2020-07-28T22:51:25.000+0000
Willoughby Ave & Hall St,244,6,2020-07-28T22:57:12.000+0000
Broadway & E 14 St,285,51,2020-07-28T22:58:58.000+0000
St James Pl & Oliver St,308,10,2020-07-28T23:00:18.000+0000
Vesey Pl & River Terrace,327,26,2020-07-28T23:00:29.000+0000
E 31 St & 3 Ave,476,4,2020-07-28T23:00:00.000+0000
W 53 St & 10 Ave,480,0,2020-07-28T22:56:44.000+0000
10 Ave & W 28 St,489,26,2020-07-28T22:59:36.000+0000


##### Stations with Longest Trips  -- WIP