Automating Data Pipelines with Pyspark SQL

In [0]:
dbutils.widgets.dropdown("samples", "calories_burnt", ["calories_burnt","average_heart_rate"])

In [0]:
dbutils.widgets.text("FilePath", "/db_sample_files")

In [0]:
table_name = dbutils.widgets.get("samples")
file_path = dbutils.widgets.get("FilePath")
print(table_name)
print(file_path)

In [0]:
%sh
wget -O /dbfs/calories_burnt.json https://raw.githubusercontent.com/austinyoung123/dbtestingsamples/master/calories_burnt.json
wget -O /dbfs/average_heart_rate.json https://raw.githubusercontent.com/austinyoung123/dbtestingsamples/master/average_heart_rate.json
wget -O /dbfs/calories_burnt.txt https://raw.githubusercontent.com/austinyoung123/dbtestingsamples/master/calories_burnt.txt
wget -O /dbfs/average_heart_rate.txt https://raw.githubusercontent.com/austinyoung123/dbtestingsamples/master/average_heart_rate.txt

In [0]:
%sh
cd /dbfs
ls

In [0]:
dbutils.fs.mkdirs(f'{file_path}')
dbutils.fs.mkdirs(f'{file_path}/calories_burnt')
dbutils.fs.mkdirs(f'{file_path}/average_heart_rate')
dbutils.fs.cp('file:/dbfs/calories_burnt.json', f'{file_path}/calories_burnt')
dbutils.fs.cp('file:/dbfs/average_heart_rate.json', f'{file_path}/average_heart_rate')
dbutils.fs.cp('file:/dbfs/calories_burnt.txt', f'{file_path}')
dbutils.fs.cp('file:/dbfs/average_heart_rate.txt', f'{file_path}')

In [0]:
dbutils.fs.ls(f'{file_path}')

In [0]:
schema_location = f"file:/dbfs/{table_name}.json"

###Take a single file with the proper schema and use as your "Hacky" schema

schema_file = spark.read.option("multiLine",True).format('json') \
  .load(schema_location).schema

schema_file

In [0]:
df = spark.read.format('json').option('multiLine', True).load(f'{file_path}/{table_name}')
display(df)

data
"List(List(List(250.6999969482422, 5, 2.506999969482422, 5014, 2018-07-21 13:53:32.530695, 24), List(365.70001220703125, 5, 3.6570000648498535, 7314, 2018-07-24 02:37:36.530805, 4), List(126.80000305175781, 13, 1.2680000066757202, 2536, 2018-07-22 14:24:34.530772, 24), List(365.70001220703125, 5, 3.6570000648498535, 7314, 2018-07-24 02:37:36.530805, 4)))"


Streaming Application

In [0]:
print(f'{file_path}/{table_name}')

In [0]:
from pyspark.sql.functions import col, explode

stream_location = f'{file_path}/{table_name}'

#You can't infer the schema with a streaming source DataFrame!

df = spark.readStream.format('json').option('multiLine',True).schema(schema_file).load(stream_location).select(col("data.*")).withColumn("results", explode(col("results"))).select(col("results.*")) \
.writeStream \
.trigger(once=True) \
.format("delta") \
.option("checkpointLocation", f'{file_path}/Checkpoints/Ingest/{table_name}') \
.outputMode("append") \
.queryName("example_stream") \
.partitionBy("user_id") \
.start(f'{file_path}/Ingest/{table_name}') 

In [0]:
display(spark.sql(f"""select  * from delta.`{file_path}/Ingest/{table_name}`"""))

calories_burnt,device_id,miles_walked,num_steps,time_stamp,user_id
365.7000122070313,5,3.6570000648498535,7314,2018-07-24 02:37:36.530805,4
365.7000122070313,5,3.6570000648498535,7314,2018-07-24 02:37:36.530805,4
250.6999969482422,5,2.506999969482422,5014,2018-07-21 13:53:32.530695,24
126.8000030517578,13,1.2680000066757202,2536,2018-07-22 14:24:34.530772,24


Create table statements from legacy systems exist in .txt files. Use sc.textFile to extract those original schemas for consumption

In [0]:
import re
###USE THIS CODE IF YOU ARE LOADING IN A .TXT FILE FOR THE CREATE TABLE STATEMENT
sample_sql = sc.textFile(name=f"{file_path}/{table_name}.txt", use_unicode=True).collect()[0] 
regex_matcher_schema = re.findall(r"\(([^)]+)", sample_sql)
schema_for_create_table = regex_matcher_schema[0] + ',' + regex_matcher_schema[1]
schema_for_query = ', '.join(i for i in re.findall(r"`(.*?)`", schema_for_create_table))

In [0]:
print(schema_for_create_table)
print(schema_for_query)

Create Delta Table with consumption schema

In [0]:
spark.sql(f"""CREATE TABLE IF NOT EXISTS Parse_{table_name} ({schema_for_create_table}) \
USING DELTA \
LOCATION '{file_path}/Parse/{table_name}' \
PARTITIONED BY (user_id)""")

In [0]:
%sql
show tables

database,tableName,isTemporary
default,parse_calories_burnt,False


Convert timestamp from string to TimestampType and use mergeSchema option to infer the schema from the consumption layer

In [0]:
from pyspark.sql.types import TimestampType

df = spark.readStream.format('delta') \
.load(f'{file_path}/Ingest/{table_name}') \
.withColumn('time_stamp', col('time_stamp').cast(TimestampType())) \
.writeStream \
.trigger(once=True) \
.format("delta") \
.option("checkpointLocation", f"{file_path}/Checkpoints/Parse/{table_name}") \
.option("mergeSchema", "true") \
.outputMode("append") \
.queryName("example_parse_stream") \
.partitionBy("user_id") \
.start(f'{file_path}/Parse/{table_name}')

Check differences between Ingest and Parse schemas (time_stamp + added column "sample_new_field")

In [0]:
spark.read.load(f'{file_path}/Ingest/{table_name}')

In [0]:
spark.read.load(f'{file_path}/Parse/{table_name}')

In [0]:
spark.sql(f"""CREATE TABLE IF NOT EXISTS DeDuplicate_{table_name} ({schema_for_create_table}) \
USING DELTA \
LOCATION '{file_path}/DeDuplicate/{table_name}' \
PARTITIONED BY (user_id)""")

"MERGE INTO" statement that both removes duplicates in my inserted file + prevents duplicates from loading in

In [0]:
def upsertToDelta(microBatchOutputDF, batchId):
  # set the dataframe to view name
  
  microBatchOutputDF.createOrReplaceTempView("updates")
  
  microBatchOutputDF._jdf.sparkSession().sql(f"""
  MERGE INTO delta.`{file_path}/DeDuplicate/{table_name}` as new
  USING (select {schema_for_query} from
  (select *,
  Row_number() OVER (PARTITION BY user_id, device_id
  ORDER BY user_id,device_id desc) num
  from updates) where num = 1) old
  ON old.user_id = new.user_id and old.device_id  = new.device_id
  WHEN NOT MATCHED THEN INSERT *
  """)

#be sure not to use a filter when it is the initial load.
updates = spark.readStream.format("delta").load(f'{file_path}/Parse/{table_name}')

# Start the query to upsert
updates.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .trigger(once=True) \
  .outputMode("update") \
  .start() \


In [0]:
display(spark.sql(f"""SELECT * FROM DeDuplicate_{table_name}"""))

calories_burnt,device_id,miles_walked,num_steps,time_stamp,sample_new_field,user_id
365.7000122070313,5,3.6570000648498535,7314,2018-07-24T02:37:36.530+0000,,4
250.6999969482422,5,2.506999969482422,5014,2018-07-21T13:53:32.530+0000,,24
126.8000030517578,13,1.2680000066757202,2536,2018-07-22T14:24:34.530+0000,,24
