<h1> Demo 6: incremental Loading with Spark Pools

<h3>We will start off by creating a delta table out of our customer dimension

In [16]:
%%pyspark
df = spark.read.load('abfss://raw@synapseinaday.dfs.core.windows.net/Dimension/Customer/', format='parquet')

df.write.format("delta").save("/delta/CustomerDimension")

StatementMeta(SampleSpark, 2, 13, Finished, Available)



<h3> We will now read the log to see what entries we have in our delta log at this point in time

In [18]:
%%pyspark
[log_line.value for log_line in spark.read.text( "/delta/CustomerDimension/_delta_log/").collect()]

StatementMeta(SampleSpark, 2, 15, Finished, Available)

['{"commitInfo":{"timestamp":1620412476417,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"20063","numOutputRows":"403"}}}', '{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}', '{"metaData":{"id":"1edb3a3b-971c-43a4-b619-f87304c064f2","format":{"provider":"parquet","options":{}},"schemaString":"{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"CustomerKey\\",\\"type\\":\\"integer\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"WWICustomerID\\",\\"type\\":\\"integer\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Customer\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"BillToCustomer\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"Category\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}},{\\"name\\":\\"BuyingGroup\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"me

<h3> As you can see we currently only see the WRITE or Insert operation we did and that we added 403 rows

<h3> We will now create a spark database which will keep our hive tables

In [58]:
%%sql
CREATE DATABASE WWI_Spark

StatementMeta(SampleSpark, 2, 55, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<h3>Next we will register our delta table into our WWI_Spark Database

In [59]:
%%sql

CREATE TABLE IF NOT EXISTS wwi_spark.CustomerDimension USING DELTA
LOCATION '/delta/CustomerDimension/'

StatementMeta(SampleSpark, 2, 56, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<h3> Now we will need a connection to our Serverless SQL Pool to edit some data from our Dimension source

In [38]:
%%pyspark

jdbcHostname = "synapseinadaywe-ondemand.sql.azuresynapse.net"
jdbcDatabase = "WWI"


jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : "***",
  "password" : "****",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

StatementMeta(SampleSpark, 2, 35, Finished, Available)



<h3>We update the customer value of the CustomerDimension for customerkey 1 & add a new customer with id 1000

In [63]:
%%pyspark
CustomerQuery = """(
SELECT
     CustomerKey, WWICustomerID,
     cast(Customer + cast(NEWID() as varchar(100)) as varchar(400)) as Customer , 
     BillToCustomer, Category, BuyingGroup, PrimaryContact, PostalCode, ValidFrom, ValidTo, LineageKey
FROM
    OPENROWSET(
        BULK 'raw/Dimension/Customer/Dimension.Customer.parquet',
        DATA_SOURCE = 'RawDS',
        FORMAT='PARQUET'
    ) AS [result]
    WHERE CustomerKey = '1'
UNION 
SELECT 1000,1000,'DUMMY CUSTOMER','DUMMY CUSTOMER','DUMMY CUSTOMER','DUMMY CUSTOMER','DUMMY CUSTOMER',999999,'2013-01-01T00:00:00.0000000','9999-12-31T23:59:59.9999999',2)TableA
"""

df2 = spark.read.jdbc(url=jdbcUrl, table=CustomerQuery, properties=connectionProperties)
display(df2)

StatementMeta(SampleSpark, 2, 60, Finished, Available)

SynapseWidget(Synapse.DataFrame, 748b4680-bf72-4ed7-9805-d4e0bfb747b1)

<h3>We then merge the data into the delta table

In [64]:
%%pyspark
import delta

DeltaTable = delta.DeltaTable.forPath(spark,"/delta/CustomerDimension/")


(DeltaTable
    .alias("DeltaCustomers")
    .merge(df2.alias("UpdatedCustomers"),
        "DeltaCustomers.CustomerKey == UpdatedCustomers.CustomerKey")
        .whenMatchedUpdate(set = {"Customer" : "UpdatedCustomers.Customer"})
        .whenNotMatchedInsert(values = {"CustomerKey" : "UpdatedCustomers.CustomerKey" , "WWICustomerID" : "UpdatedCustomers.WWICustomerID" , "Customer" : "UpdatedCustomers.Customer" , "BillToCustomer" : "UpdatedCustomers.BillToCustomer" , "Category" : "UpdatedCustomers.Category" , "BuyingGroup" : "UpdatedCustomers.BuyingGroup" , "PrimaryContact" : "UpdatedCustomers.PrimaryContact" , "PostalCode" : "UpdatedCustomers.PostalCode" , "ValidFrom" : "UpdatedCustomers.ValidFrom" ,"ValidTo" : "UpdatedCustomers.ValidTo" ,"LineageKey" : "UpdatedCustomers.LineageKey" })
        .execute()
)        

StatementMeta(SampleSpark, 2, 61, Finished, Available)



<h3> As we can see below the value got changed and added a unique identifier and the new row was added

In [65]:
%%sql

select * from wwi_spark.customerdimension where CustomerKey = 1 or CustomerKey = 1000

StatementMeta(SampleSpark, 2, 62, Finished, Available)

<Spark SQL result set with 2 rows and 11 fields>

<h3> With the History command we can see what changes happened to our table and what versions we have(First Value)

In [69]:
%%pyspark 
DeltaTable.history().show(20, 1000, False)

StatementMeta(SampleSpark, 2, 66, Finished, Available)

+-------+-------------------+------+--------+---------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|          timestamp|userId|userName|operation|                                                           operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|                                                                                                                                                                                                operationMetrics|
+-------+-------------------+------+--------+---------+------------------------------------------------------------------------------+----+--------+---------+-----------+------------

<h3> We can use the VersionAsOf command to see what changes happened to our data

In [73]:
%%pyspark
History = spark.read.format("delta").option("versionAsOf", 0).load("/delta/CustomerDimension/")
History.show(2,False)

StatementMeta(SampleSpark, 2, 70, Finished, Available)

+-----------+-------------+---------------------------+---------------------------+------------+-------------+--------------+----------+-------------------+--------------------------+----------+
|CustomerKey|WWICustomerID|Customer                   |BillToCustomer             |Category    |BuyingGroup  |PrimaryContact|PostalCode|ValidFrom          |ValidTo                   |LineageKey|
+-----------+-------------+---------------------------+---------------------------+------------+-------------+--------------+----------+-------------------+--------------------------+----------+
|0          |0            |Unknown                    |N/A                        |N/A         |N/A          |N/A           |N/A       |2013-01-01 00:00:00|9999-12-31 23:59:59.999999|0         |
|1          |1            |Tailspin Toys (Head Office)|Tailspin Toys (Head Office)|Novelty Shop|Tailspin Toys|Waldemar Fisar|90410     |2013-01-01 00:00:00|9999-12-31 23:59:59.999999|2         |
+-----------+------------

In [None]:
%%sql
DROP DATABASE wwi_spark CASCADE;

GIVE ME SOME WHITESPACE


























