<a href="https://colab.research.google.com/github/ramayer/google-colab-examples/blob/main/Spark_Streaming_more_efficient_MERGE_INTO_with_Delta_Tables.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Spark Delta Table more efficient MERGE INTO


### Based on

* https://docs.databricks.com/_static/notebooks/merge-in-streaming.html
* https://github.com/delta-io/delta/issues/490
* https://kb.databricks.com/delta/delta-merge-into.html
* https://docs.microsoft.com/en-us/azure/databricks/kb/delta/delta-merge0-into



#### install Java and Spark

In [82]:
!apt-get -qq install -y openjdk-8-jdk-headless > /tmp/apt-get.out
!(wget -q --show-progress -nc https://mirrors.ocf.berkeley.edu/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz)
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

## Install pyspark and related python libraries



In [2]:
try:
  import pyspark, findspark, delta
except:
  %pip install -q --upgrade pyspark findspark delta


[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[K     |████████████████████████████████| 198 kB 46.5 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Building wheel for delta (setup.py) ... [?25l[?25hdone


# Start a Spark Session


In [3]:
import findspark
import pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
# Reasonable for tiny one-node Spark "cluster" in Google Colab notebooks
MAX_MEMORY="8g"
findspark.init()
from pyspark.sql import SparkSession
spark = (pyspark.sql.SparkSession.builder.appName("MyApp") 
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .config("spark.executor.memory", MAX_MEMORY) 
    .config("spark.driver.memory", MAX_MEMORY) 
    .enableHiveSupport() 
    .getOrCreate()        
    )
spark

## Create a spark streaming pipeline streaming data from Bronze -> Silver -> Gold

In [4]:
spark.sql("DROP TABLE IF EXISTS spark_streaming_bronze")
spark.sql("""
   create table if not exists spark_streaming_bronze (
     id            bigint,
     version_id    int,
     partition_id  int, -- generated always as (id %10) 
     ts            timestamp,
     data          string
   ) USING DELTA;
""")

DataFrame[]

In [84]:
!ps

    PID TTY          TIME CMD
      1 ?        00:00:00 docker-init
      7 ?        00:00:06 node
     17 ?        00:00:00 tail
     35 ?        00:00:00 colab-fileshim.
     48 ?        00:00:07 jupyter-noteboo
     49 ?        00:00:04 dap_multiplexer
     59 ?        00:00:25 python3
     79 ?        00:00:09 python3
    251 ?        00:05:03 java
   2194 ?        00:00:00 ps


In [6]:
df = spark.read.table("spark_streaming_bronze")
df.limit(0).write.format("delta").mode("overwrite").saveAsTable("spark_streaming_silver_naive")
df.limit(0).write.format("delta").mode("overwrite").saveAsTable("spark_streaming_silver_theoretically_better")
df.limit(0).write.format("delta").mode("overwrite").saveAsTable("spark_streaming_silver_actually_better")

In [27]:
spark.sql("""
  insert into spark_streaming_bronze (select id, 1, id%10, now(), 'data for '||id  from range(100000))
""")

DataFrame[]

## Set up the streaming operations from Bronze to Silver

In [48]:
bronze_input_stream = (spark.readStream.format("delta")
    .option("maxFilesPerTrigger",10)
    .option("maxBytesPerTrigger",1_000_000_000)
    .option("ignoreChanges","True")
    .table("spark_streaming_bronze")
    )
bronze_input_stream.isStreaming

True

In [69]:
outstr = (bronze_input_stream.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/s1")
  .toTable("events")
)
outstr

<pyspark.sql.streaming.StreamingQuery at 0x7fdb75669310>

In [71]:
spark.sql("select * from events").show()

+---+----------+------------+--------------------+-----------+
| id|version_id|partition_id|                  ts|       data|
+---+----------+------------+--------------------+-----------+
|  0|         1|           0|2021-11-26 10:32:...| data for 0|
|  1|         1|           1|2021-11-26 10:32:...| data for 1|
|  2|         1|           2|2021-11-26 10:32:...| data for 2|
|  3|         1|           3|2021-11-26 10:32:...| data for 3|
|  4|         1|           4|2021-11-26 10:32:...| data for 4|
|  5|         1|           5|2021-11-26 10:32:...| data for 5|
|  6|         1|           6|2021-11-26 10:32:...| data for 6|
|  7|         1|           7|2021-11-26 10:32:...| data for 7|
|  8|         1|           8|2021-11-26 10:32:...| data for 8|
|  9|         1|           9|2021-11-26 10:32:...| data for 9|
| 10|         1|           0|2021-11-26 10:32:...|data for 10|
| 11|         1|           1|2021-11-26 10:32:...|data for 11|
| 12|         1|           2|2021-11-26 10:32:...|data 

In [72]:
o = o.option("checkpointLocation", "/tmp/s1")


In [85]:
def upsertToDelta(microBatchOutputDF, batchId): 
  '''
    Naive approach from https://docs.databricks.com/_static/notebooks/merge-in-streaming.html
    Can result in many unnecessary rows streamed downstream.
  '''
  microBatchOutputDF.createOrReplaceTempView("updates")
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO spark_streaming_silver_naive t
    USING updates s
    ON s.id = t.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
  return 1

naive_output_stream = (bronze_input_stream.writeStream
                       .format("delta")
                       #.trigger(processingTime='5 seconds')
                       .trigger(once=True)
                       .option("checkpointLocation","/tmp/naive_checkpoint_2")
                       .foreachBatch(upsertToDelta)
                       .outputMode("update")
                       .start()
)
naive_output_stream.status

{'isDataAvailable': False,
 'isTriggerActive': False,
 'message': 'Initializing sources'}

In [88]:
naive_output_stream.status

{'isDataAvailable': False,
 'isTriggerActive': False,
 'message': 'Terminated with exception: Error while obtaining a new communication channel'}

AttributeError: ignored

# Show table histories



In [29]:
import IPython.display
import html
import datetime
import json

def html_escape(c):
  if isinstance(c,datetime.datetime):
    return c.isoformat()
  if isinstance(c,dict):
    return "<pre>"+html.escape(json.dumps(c,indent=1))+"</pre>"
  return html.escape(str(c))

def df_to_html(df,rows=10,title = None):
  html_rows = []
  for idx,row in enumerate(df.take(rows)):
    data = row.asDict(True)
    if idx == 0:
      cells = [html.escape(v) for v in data.keys()]
      html_rows.append("<tr><th>"+"</th><th>".join(cells)+"</th></tr>")
    cells = [html_escape(v) for v in data.values()]
    html_rows.append("<tr><td>"+"</td><td>".join(cells)+"</td></tr>")
  title_row = title and f"<tr><th colspan={len(df.columns)}>{html.escape(title)}</th></tr>"
  h = "<table>" + (title_row or "") + ("\n".join(html_rows)) + "</table>"
  style = """
            <style>
              tr {vertical-align:baseline;}
              table {border-collapse: collapse;	border-spacing: 0;}
              th, td {border: 1px solid black; padding:5px}
            </style>
          """
  return style + h


def table_history(tbl):
  interesting_fields = "timestamp, operation, operationmetrics,operationParameters".split(',')
  return spark.sql(f"""describe history {tbl}""").selectExpr(interesting_fields)

IPython.display.HTML(
    df_to_html(table_history('spark_streaming_bronze')      ,title='source') +
    df_to_html(table_history('spark_streaming_silver_naive'),title='silver_1') +
    df_to_html(table_history('spark_streaming_bronze')      ,title='source')
)

source,source,source,source
timestamp,operation,operationmetrics,operationParameters
2021-11-26T10:32:27,WRITE,"{  ""numOutputRows"": ""100000"",  ""numOutputBytes"": ""903476"",  ""numFiles"": ""2"" }","{  ""mode"": ""Append"",  ""partitionBy"": ""[]"" }"
2021-11-26T10:14:31,CREATE TABLE,{},"{  ""description"": null,  ""partitionBy"": ""[]"",  ""properties"": ""{}"",  ""isManaged"": ""true"" }"

silver_1,silver_1,silver_1,silver_1
timestamp,operation,operationmetrics,operationParameters
2021-11-26T10:15:04,CREATE OR REPLACE TABLE AS SELECT,"{  ""numOutputRows"": ""0"",  ""numOutputBytes"": ""632"",  ""numFiles"": ""1"" }","{  ""description"": null,  ""partitionBy"": ""[]"",  ""properties"": ""{}"",  ""isManaged"": ""true"" }"

source,source,source,source
timestamp,operation,operationmetrics,operationParameters
2021-11-26T10:32:27,WRITE,"{  ""numOutputRows"": ""100000"",  ""numOutputBytes"": ""903476"",  ""numFiles"": ""2"" }","{  ""mode"": ""Append"",  ""partitionBy"": ""[]"" }"
2021-11-26T10:14:31,CREATE TABLE,{},"{  ""description"": null,  ""partitionBy"": ""[]"",  ""properties"": ""{}"",  ""isManaged"": ""true"" }"
