In [1]:
%fs ls dbfs:/

path,name,size
dbfs:/FileStore/,FileStore/,0
dbfs:/databricks/,databricks/,0
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-results/,databricks-results/,0
dbfs:/dbacademy/,dbacademy/,0
dbfs:/default/,default/,0
dbfs:/loan_by_state.parquet/,loan_by_state.parquet/,0
dbfs:/local_disk0/,local_disk0/,0
dbfs:/ml/,ml/,0
dbfs:/mnt/,mnt/,0


In [2]:
%sql
CREATE DATABASE IF NOT EXISTS dbacademy;
USE dbacademy;

In [3]:
sqlContext.setConf("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)
#by default Spark creates 200 files 
#number of partitions should match the number of clusters for optimum usage of clusters

In [4]:
#connection to AWS s3 and mounting the files to hdfs
import urllib
access_key = "access_key"
secret_key = "secret_key"
s_s=urllib.parse.quote(secret_key,"")
bucket_name = "bucket_name"
mount_name = "mount_name"
dbutils.fs.mount("s3n://%s:%s@%s" % (access_key, s_s, bucket_name), "/mnt/%s" % mount_name)

In [5]:
%fs rm -r dbfs:/mnt/training/healthcare/silver/recordings_parsed

In [6]:
source_dir = "dbfs:/mnt/training/healthcare/"

basePath = "dbfs:/dbacademy/streaming-delta"

streamingPath          = basePath + "/source"
bronzePath             = basePath + "/bronze"
recordingsParsedPath   = basePath + "/silver/recordings_parsed"
recordingsEnrichedPath = basePath + "/silver/recordings_enriched"
dailyAvgPath           = basePath + "/gold/dailyAvg"

checkpointPath               = basePath + "/checkpoints"
bronzeCheckpoint             = basePath + "/checkpoints/bronze"
recordingsParsedCheckpoint   = basePath + "/checkpoints/recordings_parsed"
recordingsEnrichedCheckpoint = basePath + "/checkpoints/recordings_enriched"
dailyAvgCheckpoint           = basePath + "/checkpoints/dailyAvgPath"

In [7]:
dbutils.fs.rm(basePath, True)

In [8]:
class FileArrival:
  def __init__(self):
    self.source = source_dir + "/tracker/streaming/"
    self.userdir = streamingPath + "/"
    self.curr_mo = 1
    
  def arrival(self, continuous=False):
    if self.curr_mo > 12:
      print("Data source exhausted\n")
    elif continuous == True:
      while self.curr_mo <= 12:
        curr_file = f"{self.curr_mo:02}.json"
        dbutils.fs.cp(self.source + curr_file, self.userdir + curr_file)
        self.curr_mo += 1
    else:
      curr_file = f"{str(self.curr_mo).zfill(2)}.json"
      dbutils.fs.cp(self.source + curr_file, self.userdir + curr_file)
      self.curr_mo += 1
      
NewFile = FileArrival()

In [9]:
NewFile.arrival()
display(dbutils.fs.ls(streamingPath))

path,name,size
dbfs:/dbacademy/streaming-delta/source/01.json,01.json,506710
dbfs:/dbacademy/streaming-delta/source/02.json,02.json,711589
dbfs:/dbacademy/streaming-delta/source/03.json,03.json,1652829
dbfs:/dbacademy/streaming-delta/source/04.json,04.json,1614958
dbfs:/dbacademy/streaming-delta/source/05.json,05.json,971481
dbfs:/dbacademy/streaming-delta/source/06.json,06.json,811543
dbfs:/dbacademy/streaming-delta/source/07.json,07.json,2004546
dbfs:/dbacademy/streaming-delta/source/08.json,08.json,1854379
dbfs:/dbacademy/streaming-delta/source/09.json,09.json,1288572
dbfs:/dbacademy/streaming-delta/source/10.json,10.json,1202012


In [10]:
(spark.readStream
  .format("text")
  .schema("data STRING")
  .option("maxFilesPerTrigger", 1)  # This is used for testing to simulate 1 file arriving at a time.  Generally, don't set this in production.
  .load(streamingPath)
  .createOrReplaceTempView("recordings_raw_temp"))

In [11]:
%sql
CREATE OR REPLACE TEMPORARY VIEW recordings_bronze_temp AS (
  SELECT current_timestamp() receipt_time, "recordings" dataset, *
  FROM recordings_raw_temp
)

In [12]:
%sql
select * from recordings_raw_temp limit 2;

data
"{""device_id"":23,""heartrate"":96.2296475916,""mrn"":40580129,""time"":1.578301316998366E9}"
"{""device_id"":23,""heartrate"":93.4442586858,""mrn"":40580129,""time"":1.5783022165450108E9}"


In [13]:
%sql
select * from recordings_bronze_temp limit 2;

receipt_time,dataset,data
2020-04-21T03:02:12.477+0000,recordings,"{""device_id"":23,""heartrate"":96.2296475916,""mrn"":40580129,""time"":1.578301316998366E9}"
2020-04-21T03:02:12.477+0000,recordings,"{""device_id"":23,""heartrate"":93.4442586858,""mrn"":40580129,""time"":1.5783022165450108E9}"


In [14]:
(spark.table("recordings_bronze_temp")
  .writeStream
  .format("delta")
  .option("checkpointLocation", bronzeCheckpoint)
  .outputMode("append")
  .start(bronzePath))

In [15]:
%fs ls dbfs:/dbacademy/streaming-delta/checkpoints/bronze

path,name,size
dbfs:/dbacademy/streaming-delta/checkpoints/bronze/commits/,commits/,0
dbfs:/dbacademy/streaming-delta/checkpoints/bronze/metadata,metadata,45
dbfs:/dbacademy/streaming-delta/checkpoints/bronze/offsets/,offsets/,0
dbfs:/dbacademy/streaming-delta/checkpoints/bronze/sources/,sources/,0


In [16]:
%fs ls dbfs:/dbacademy/streaming-delta/bronze

path,name,size
dbfs:/dbacademy/streaming-delta/bronze/_delta_log/,_delta_log/,0
dbfs:/dbacademy/streaming-delta/bronze/part-00000-9a6fbe91-9426-4ca3-b18f-7bac3113836d-c000.snappy.parquet,part-00000-9a6fbe91-9426-4ca3-b18f-7bac3113836d-c000.snappy.parquet,181993


In [17]:
# Display how many records are in our table so we can watch it grow.

display(spark.readStream.format("delta").load(bronzePath).groupBy().count())

count
154767


In [18]:
(spark.readStream
       .format('delta')
       .load(bronzePath)
       .createOrReplaceTempView('bronze_unparsed_temp'))

In [19]:
%sql
create or replace temp view recordings_parsed_temp as 
  select json.device_id device_id, json.mrn mrn, json.heartrate heartrate, json.time time
  from (
    select from_json(data, "device_id Integer, mrn LONG, heartrate DOUBLE, time DOUBLE") json 
    from bronze_unparsed_temp
    where dataset = "recordings")

In [20]:
%sql
select count(*) from recordings_parsed_temp

count(1)
154767


In [21]:
(spark.table("recordings_parsed_temp")
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", recordingsParsedCheckpoint)
  .start(recordingsParsedPath))
#As new files arrived and are parsed into the upstream table, this query will automatically pick up those changes.

In [22]:
%sql
select * from recordings_parsed_temp limit 5

device_id,mrn,heartrate,time
23,40580129,96.2296475916,1578301316.998366
23,40580129,93.4442586858,1578302216.5450108
23,40580129,94.2165936572,1578303111.2630153
23,40580129,92.1651460706,1578304008.8954644
23,40580129,91.3190626151,1578304913.7983682


In [23]:
(spark
  .read
  .format("csv")
  .schema("mrn STRING, name STRING")
  .option("header", True)
  .load(f"{source_dir}/patient/patient_info.csv")
  .createOrReplaceTempView("pii"))

In [24]:
%sql
select * from pii limit 5

mrn,name
23940128,Caitlin Garcia
18064290,Anthony Perez
95384990,Tanya Diaz
53057176,Autumn Calderon
96005424,Ronald Smith


In [25]:
(spark.readStream
  .format("delta")
  .load(recordingsParsedPath)
  .createOrReplaceTempView("silver_recordings_temp"))

In [26]:
%sql
create or replace temp view recordings_w_pii as (
select device_id, a.mrn, b.name, cast(from_unixtime(time, "yyyy-mm-dd HH:MM:SS") as timestamp) time, heartrate 
from silver_recordings_temp a
inner join pii b
on a.mrn = b.mrn
where heartrate > 0)

In [27]:
%sql
select * from silver_recordings_temp limit 5

device_id,mrn,heartrate,time
23,40580129,54.0122153343,1580515318.7989018
17,52804177,92.5136468131,1580515375.925345
37,65300842,52.1354807863,1580515738.293533
23,40580129,54.6477014191,1580516211.1955204
17,52804177,95.033344842,1580516288.7768724


In [28]:
display(spark.readStream.format("delta").load(recordingsEnrichedPath).groupBy().count())

count
146796


In [29]:
NewFile.arrival()

In [30]:
(spark.table("recordings_w_pii")
  .writeStream
  .format("delta")
  .option("checkpointLocation", recordingsEnrichedCheckpoint)
  .outputMode("append")
  .start(recordingsEnrichedPath))

In [31]:
%sql
select * from recordings_w_pii limit 10

device_id,mrn,name,time,heartrate
23,40580129,Nicholas Spears,2020-01-01T00:02:00.000+0000,54.0122153343
17,52804177,Lynn Russell,2020-02-01T00:02:00.000+0000,92.5136468131
37,65300842,Samuel Hughes,2020-08-01T00:02:00.000+0000,52.1354807863
23,40580129,Nicholas Spears,,54.6477014191
17,52804177,Lynn Russell,,95.033344842
37,65300842,Samuel Hughes,,57.3391541312
23,40580129,Nicholas Spears,,56.6165053697
17,52804177,Lynn Russell,,94.8134313932
37,65300842,Samuel Hughes,,56.2469995332
23,40580129,Nicholas Spears,,54.8372685558


In [32]:
%sql
select approx_count_distinct(device_id) from recordings_w_pii

approx_count_distinct(device_id)
34


In [33]:
(spark.readStream
  .format("delta")
  .load(recordingsEnrichedPath)
  .createOrReplaceTempView("recordings_enriched_temp"))


#Creating GOLD table

In [35]:
%sql
CREATE OR REPLACE TEMP VIEW patient_avg AS (
  SELECT mrn, name, MEAN(heartrate) avg_heartrate, date_trunc("DD", time) date
  FROM recordings_enriched_temp
  GROUP BY mrn, name, date_trunc("DD", time));
select * from patient_avg limit 5;

mrn,name,avg_heartrate,date
18477029,Sean Brown,93.30678479798958,2020-07-24T00:00:00.000+0000
38299723,Cynthia Figueroa,67.94627273456668,2020-07-31T00:00:00.000+0000
41675882,Crystal Ho,75.24604093537916,2020-05-19T00:00:00.000+0000
53057176,Autumn Calderon,77.65769185481112,2020-01-21T00:00:00.000+0000
84682617,Kyle Cruz,97.17093971638748,2020-03-10T00:00:00.000+0000


#storing the GOLD table data in "DELTA" format
#when there is aggregation of data (group by), that data should not be used for streaming out into files unless there is watermark() used along with the query

In [37]:
(spark.table("patient_avg")
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", dailyAvgCheckpoint)
  .trigger(once=True)
  .start(dailyAvgPath)
)

In [38]:
spark.sql("""
  DROP TABLE IF EXISTS daily_patient_avg
""")
spark.sql(f"""
  CREATE TABLE daily_patient_avg
  USING DELTA
  LOCATION '{dailyAvgPath}'
""")

In [39]:
%sql
select count(mrn), mrn, MEAN(avg_heartrate)  from daily_patient_avg group by mrn order by mrn ;

count(mrn),mrn,mean(avg_heartrate)
30,18477029,93.37350982377508
16,20793791,83.24662359031231
28,38299723,74.86987454907126
61,40580129,84.55502699637036
37,46722881,86.591879954544
61,52804177,79.4291890902921
17,53962192,88.7334859443876
61,55827205,83.01916390348192
62,65300842,83.15765277869727
32,77385574,82.2459158438508


In [40]:
%sql
SELECT * 
FROM daily_patient_avg
WHERE date BETWEEN "2020-01-17" AND "2020-01-31"

mrn,name,avg_heartrate,date
40580129,Nicholas Spears,84.15279162254545,2020-01-18T00:00:00.000+0000
40580129,Nicholas Spears,90.19226846513912,2020-01-29T00:00:00.000+0000
40580129,Nicholas Spears,83.87160423675499,2020-01-26T00:00:00.000+0000
46722881,Rachel Contreras,96.9537797271,2020-01-26T00:00:00.000+0000
40580129,Nicholas Spears,87.28136623931819,2020-01-23T00:00:00.000+0000
40580129,Nicholas Spears,75.8357679881857,2020-01-31T00:00:00.000+0000
40580129,Nicholas Spears,88.7728571047,2020-01-25T00:00:00.000+0000
40580129,Nicholas Spears,87.6824889677381,2020-01-21T00:00:00.000+0000
46722881,Rachel Contreras,90.791853455,2020-01-24T00:00:00.000+0000
40580129,Nicholas Spears,82.43796936707,2020-01-28T00:00:00.000+0000


In [41]:
NewFile.arrival(continuous=True)

In [42]:
for s in spark.streams.active:
    s.stop()