### Delta Lake continued, Intro to Structured Streaming

* Delta Lake
    * Data Lake to Data Warehouse: merge into
    * Updating table definition
    * Partition column
* Structured Streaming intro

In [0]:
import pyspark.sql.functions as F
from delta.tables import *

# We will use a movies dataset and try to simulate dimensional modelling behaviour
dbutils.fs.ls("mnt/training/movies/1m/")
#dbutils.fs.head("mnt/training/movies/1m/README")
#dbutils.fs.head("mnt/training/movies/1m/movies.dat")

In [0]:
# we don't need to create a separate variable for our dataframes
# if we have done testing/debugging, we can be more concise

(spark.read
 .option("delimiter", "::")
 .option("inferSchema", "true")
 .csv("/mnt/training/movies/1m/movies.dat")
 .toDF("MovieID","Title","Genres")
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_movies")
)

(spark.read
 .option("delimiter", "::")
 .option("inferSchema", "true")
 .csv("/mnt/training/movies/1m/ratings.dat")
 .toDF("UserID","MovieID","Rating","Timestamp")
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings")
)

(spark.read
 .option("delimiter", "::")
 .option("inferSchema", "true")
 .csv("/mnt/training/movies/1m/users.dat")
 .toDF("UserID","Gender","Age","Occupation","Zip-code")
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_users")
)

In [0]:
# let's separate data into smaller batches for simulating incremental inserts
(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2000-01-01','2000-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2000")
)

(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2001-01-01','2001-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2001")
)

(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2002-01-01','2002-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2002")
)

(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2003-01-01','2003-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2003")
)

In [0]:
spark.sql("""
CREATE OR REPLACE TABLE DimUser AS
SELECT *
, CAST('2001-01-01' as date) ValidFrom
, CAST('9999-12-31' as date) ValidTo
FROM source_users
WHERE UserID IN (SELECT UserID FROM source_ratings_2000)
""")

spark.sql("""
CREATE OR REPLACE TABLE DimMovie AS
SELECT *
, CAST('2001-01-01' as date) ValidFrom
, CAST('9999-12-31' as date) ValidTo
FROM source_movies
WHERE movieID IN (SELECT movieID FROM source_ratings_2000)
""")

spark.sql("""
CREATE OR REPLACE TABLE FactRating AS
SELECT *
FROM source_ratings_2000
""")

In [0]:
spark.sql("""UPDATE source_users
SET Age = 21
WHERE Age = 1
""")

spark.sql("""UPDATE source_users
SET `Zip-code` = 12345
WHERE `Zip-Code` = 10023""")

spark.sql("""UPDATE source_movies
SET Genres = 'Comedy'
WHERE MovieID = 18""")

In [0]:
spark.sql("""
MERGE INTO DimUser tgt
USING (
  SELECT * FROM source_users
  WHERE UserID IN (SELECT UserID FROM source_ratings_2000)
  OR UserID IN (SELECT UserID FROM source_ratings_2001)
) src
ON tgt.UserID = src.UserID
WHEN MATCHED AND (tgt.Gender != src.Gender OR tgt.Age != src.Age OR tgt.Occupation != src.Occupation OR tgt.`Zip-code` != src.`Zip-code`)
  THEN UPDATE SET tgt.ValidTo = '2002-01-01'
WHEN NOT MATCHED THEN INSERT (UserID, Gender, Age, Occupation, `Zip-code`, ValidFrom, ValidTo) VALUES ( 
          src.UserID
          , src.Gender
          , src.Age
          , src.Occupation
          , src.`Zip-code`
          , CAST('2002-01-01' as date) 
          , CAST('9999-12-31' as date)
          )
""")

spark.sql("""
MERGE INTO DimUser tgt
USING (
  SELECT * FROM source_users
  WHERE UserID IN (SELECT UserID FROM source_ratings_2000)
  OR UserID IN (SELECT UserID FROM source_ratings_2001)
) src
ON tgt.UserID = src.UserID
AND tgt.Gender = src.Gender
AND tgt.Age = src.Age
AND tgt.Occupation = src.Occupation
AND tgt.`Zip-code` = src.`Zip-code`
WHEN NOT MATCHED THEN INSERT (UserID, Gender, Age, Occupation, `Zip-code`, ValidFrom, ValidTo) VALUES ( 
          src.UserID
          , src.Gender
          , src.Age
          , src.Occupation
          , src.`Zip-code`
          , CAST('2002-01-01' as date) 
          , CAST('9999-12-31' as date)
          )
""")

In [0]:
# using the Delta API

dimMovieTable = DeltaTable.forName(spark, "dimMovie")
sourceMoviesDf = spark.sql("""
SELECT * FROM source_movies
WHERE MovieID IN (SELECT MovieID FROM source_ratings_2000)
OR MovieID IN (SELECT MovieID FROM source_ratings_2001)
""")

dimMovieTable.alias("tgt").merge(
  source = sourceMoviesDf.alias("src"),
  condition = "tgt.MovieID = src.MovieID"
).whenMatchedUpdate(
  condition = "tgt.Title != src.Title OR tgt.Genres != src.Genres",
  set = 
  {
   "ValidTo": "cast('2002-01-01' as date)"
  }
).whenNotMatchedInsert(values =
    {
      "MovieID": "src.MovieID",
      "Title": "src.Title",
      "Genres": "src.Genres",
      "ValidFrom": "cast('2002-01-01' as date)",
      "ValidTo": "cast('9999-12-31' as date)"
    }
).execute()


dimMovieTable.alias("tgt").merge(
  source = sourceMoviesDf.alias("src"),
  condition = "tgt.MovieID = src.MovieID AND tgt.Title = src.Title AND tgt.Genres = src.Genres"
).whenNotMatchedInsert(values =
    {
      "MovieID": "src.MovieID",
      "Title": "src.Title",
      "Genres": "src.Genres",
      "ValidFrom": "cast('2002-01-01' as date)",
      "ValidTo": "cast('9999-12-31' as date)"
    }
).execute()

In [0]:
# schema evolution

df = (spark.table("source_ratings_2001")
      .select("*", 
              F.col("timestamp")
              .cast("timestamp")
              .cast("date")
              .alias("date")
             )
      .write
      .format("delta")
      .mode("append")
      .option("mergeSchema", "true")
      .saveAsTable("FactRating")
     )

In [0]:
# schema overwriting - mergeSchema is probably not what you want

df = (spark.table("source_ratings")
     .select(F.col("userID").alias("user"), 
              F.col("timestamp")
              .cast("timestamp")
              .alias("utc_timestamp"), 
              F.col("timestamp")
              .cast("timestamp")
              .cast("date")
              .alias("utc_date")
             )
      .write
      .format("delta")
      .mode("overwrite")
      .option("mergeSchema", "true")
      .saveAsTable("FactRating")
     )

In [0]:
# instead, use overwriteSchema - note, append mode is not allowed

(spark.table("source_ratings")
 .select(F.col("userID").alias("user"), 
          F.col("timestamp")
          .cast("timestamp")
          .alias("utc_timestamp"), 
          F.col("timestamp")
          .cast("timestamp")
          .cast("date")
          .alias("utc_date")
         )
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
#  .partitionBy("utc_date") # column partition - most commonly on date, needs to be not too unique. Ideally something you use in filters a lot. Only makes sense if it would be ~ 1 GB per partition!
  .saveAsTable("FactRating")
 )

In [0]:
# How to create tables from existing Delta data
#spark.sql("CREATE TABLE x USING DELTA LOCATION '/user/hive/warehouse/x'")

### Structured streaming

Common input/output:
* Input sources
  * Kafka (and other distributed commit logs)
  * Files on a distributed system
  * TCP-IP sockets _note: not fault-tolerant_
* Output (sinks)
  * Kafka (etc)
  * File formats
  * Spark tables

In [0]:
# let's start by looking at reading and writing streams to files
# we need a schema when reading streams

events_schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

streaming_events_df = (spark.readStream
  .schema(events_schema)
  .option("maxFilesPerTrigger", 1) # used for example purposes, reads in 1 file per trigger
  .parquet("/mnt/training/ecommerce/events/events.parquet")
)

In [0]:
# you can check if a dataframe has streaming sources
# this means some functions are unavailable (eg count)

streaming_events_df.isStreaming


In [0]:
# let's create a new dataframe and write it into a file

email_df = (streaming_events_df
            .filter(F.col("traffic_source") == "email")
            .withColumn("mobile", F.col("device").isin(["iOS", "Android"]))
            .select("user_id", "event_timestamp", "mobile")
           )

checkpoint_path = "/tmp/email_traffic/checkpoint" 
output_path = "/tmp/email_traffic/output"

devices_query = (email_df.writeStream
  .outputMode("append") # append = only new rows, complete = all rows written on every update, update = only updated rows (used in aggregations, otherwise same as append)
  .format("parquet")
  .queryName("email_traffic_query") # optional name
  .trigger(processingTime="10 second") # how often data is fetched from source
  .option("checkpointLocation", checkpoint_path) # used for fault-tolerance. Note: every query needs to have a unique check point location
  .start(output_path) # location where the file will be written
)

In [0]:
%fs ls tmp/email_traffic/output

In [0]:
# monitor the query

#devices_query.id # unique per query, persisted when restarted from checkpoint 
#devices_query.name
#devices_query.status # isDataAvailable = new data available, isTriggerActive = trigger actively firing
devices_query.awaitTermination(5) # timeout in seconds, can use to keep cluster awake, also useful for seeing if stream was quit or got an exception
devices_query.stop() # note: for streaming data, cluster will keep awake (processing ongoing)

In [0]:
# fetching from TCP

logs_df = (spark.readStream
    .format("socket")
    .option("host", "server1.databricks.training")
    .option("port", 9001)
    .load()
)

display(logs_df)

In [0]:
# we can look at a count by using the agg function

running_count_df = logs_df.agg(F.count("*"))

display(running_count_df)

In [0]:
# let's see how comfortable it is manipulating the streaming dataframe
# we want to get all rows where there is an error

# Start by parsing out the timestamp and the log data
clean_df = (logs_df
            .withColumn("ts_string", F.col("value").substr(2, 23))
            .withColumn("epoch", F.unix_timestamp("ts_string", "yyyy/MM/dd HH:mm:ss.SSS"))
            .withColumn("capturedAt", F.col("epoch").cast("timestamp"))
            .withColumn("logData", F.regexp_extract("value", """^.*\]\s+(.*)$""", 1))
           )

# Keep only the columns we want and then filter the data
errors_df = (clean_df
             .select("capturedAt", "logData")
             .filter(F.col("value").like("% (ERROR) %"))
            )

In [0]:
# let's write the errors dataframe into a delta table

(errors_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/events_stream/_checkpoints/")
  .table("errors_streaming"))

In [0]:
# we can view snapshots from this table

display(spark.table("errors_streaming")
        .sort(F.desc("capturedAt"))
       )

In [0]:
# we can also check the count of this table increasing

spark.table("errors_streaming").count()

In [0]:
# since it was a delta table, we can look at the history

errors_table = DeltaTable.forName(spark, "errors_streaming")

display(errors_table.history())

In [0]:
# use spark.streams.active to loop over all active streams
# remember to stop streams if not working on them anymore

for stream in spark.streams.active:
  stream.stop()

## Further reading

* Delta Lake MERGE INTO:
  * https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html
* Structured Streaming: 
  * https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss.html
  * https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamReader.html
  * http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
  * https://docs.databricks.com/spark/latest/structured-streaming/production.html

#### Task

Create a schema and a streaming dataframe for the JSON files in the following path:  
"/mnt/training/gaming_data/mobile_streaming_events_b/"
  
  
Use the following as basis for creating your schema:  
 |-- eventName: string (nullable = true)  
 |-- eventParams: struct (nullable = true)  
 |    |-- amount: double (nullable = true)  
 |    |-- app_name: string (nullable = true)  
 |    |-- app_version: string (nullable = true)  
 |    |-- client_event_time: string (nullable = true)  
 |    |-- device_id: string (nullable = true)  
 |    |-- game_keyword: string (nullable = true)  
 |    |-- platform: string (nullable = true)  
 |    |-- scoreAdjustment: long (nullable = true)  
  
Read in 2 files per trigger.
  
Create a new modified dataframe:
* keep only rows where eventName is "scoreAdjustment"
* select the *game_keyword*, *platform* and *scoreAdjustment* columns from the eventParams struct.  
* set trigger to run every 5 seconds.

Write the datastream to a delta table called score_adjustments.  
Check to make sure that the table has some data - this should also be visible in the cell results.  
Then stop the datastream.

In [0]:
# your answer