Sample data set generated from the [Databricks Data Generator](https://databrickslabs.github.io/dbldatagen/public_docs/index.html)

In [0]:
%pip install dbldatagen

Python interpreter will be restarted.
Collecting dbldatagen
  Using cached dbldatagen-0.3.0-py3-none-any.whl (69 kB)
Installing collected packages: dbldatagen
Successfully installed dbldatagen-0.3.0
Python interpreter will be restarted.


In [0]:
import random
hash = random.getrandbits(128)

BASE_PATH = '/tmp/dml_sample/merge/' + str(hash) + '/'

dbutils.fs.mkdirs(BASE_PATH)
customers_location = BASE_PATH + "customers"

print(customers_location)

/tmp/dml_sample/merge/9504141379420501870294217876424901151/customers


In [0]:
# Number of Rows of customer data
data_rows = 10000000
change_set_size = 10000

In [0]:
import dbldatagen as dg
import pyspark.sql.functions as F

spark.catalog.clearCache()
shuffle_partitions_requested = 8
partitions_requested = 32

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)

dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
            .withColumn("customer_id","long", uniqueValues=data_rows)
            .withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
            .withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
            .withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard',
                        'American Express', 'discover', 'branded visa', 'branded mastercard'],
                        random=True, distribution="normal")
            .withColumn("int_payment_instrument", "int",  minValue=0000, maxValue=9999,  baseColumn="customer_id",
                        baseColumnType="hash", omit=True)
            .withColumn("payment_instrument", expr="format_number(int_payment_instrument, '**** ****** *####')",
                        baseColumn="int_payment_instrument")
            .withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w')
            .withColumn("customer_notes", text=dg.ILText(words=(1,8)))
            .withColumn("created_ts", "timestamp", expr="now()")
            .withColumn("modified_ts", "timestamp", expr="now()")
            .withColumn("memo", expr="'original data'")
            )
df1 = dataspec.build()

# write table
df1.write.format("delta").save(customers_location)

INFO: Version : VersionInfo(major='0', minor='3', patch='0', release='', build='')


In [0]:
tableDefn=dataspec.scriptTable(name="customers", location=customers_location)
spark.sql(tableDefn)
tableDefn

Out[4]: "CREATE TABLE IF NOT EXISTS customers (\n    customer_id bigint,\n    name string,\n    alias string,\n    payment_instrument_type string,\n    payment_instrument string,\n    email string,\n    customer_notes string,\n    created_ts timestamp,\n    modified_ts timestamp,\n    memo string\n)\nusing delta\nlocation '/tmp/dml_sample/merge/9504141379420501870294217876424901151/customers'"

In [0]:
%sql
-- lets check our table

SELECT * 
FROM customers
LIMIT 10

customer_id,name,alias,payment_instrument_type,payment_instrument,email,customer_notes,created_ts,modified_ts,memo
1,ea m. consequat,quis w. consequat,American Express,**** ****** *669,eiusmod.sed@cupidatat.com,Incididunt magna in in quis eiusmod anim occaecat.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
2,labore eiusmod,do z. est,discover,**** ****** *2728,quis-quis@veniam,Culpa adipiscing cillum.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
3,dolor deserunt,ea x. cillum,American Express,**** ****** *707,pariatur.dolor@culpa.com,Duis.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
4,anim n. qui,laborum c. dolore,American Express,**** ****** *3940,ut.qui@id.com,Aute eiusmod cillum excepteur sint pariatur irure commodo.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
5,amet o. in,consectetur t. anim,discover,**** ****** *4268,eiusmod.ad@excepteur.com,Laborum duis labore sint quis.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
6,amet commodo,dolor culpa,discover,**** ****** *5445,sit.non@ipsum.com,Velit duis eiusmod.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
7,consequat v. est,sint p. dolore,American Express,**** ****** *6811,ut-ad@aute,Eu.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
8,dolor u. sint,ex dolore,discover,**** ****** *5916,nulla.sunt@officia.com,Enim.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
9,enim est,dolore v. tempor,Mastercard,**** ****** *3649,aliqua.qui@ea.com,Cillum exercitation consectetur.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data
10,incididunt c. reprehenderit,nostrud elit,Mastercard,**** ****** *5677,cillum.consequat@adipiscing.com,Tempor anim minim ex dolor velit nulla culpa.,2023-02-03T21:49:31.462+0000,2023-02-03T21:49:31.462+0000,original data


In [0]:
import pandas as pd
  
# list of strings
lst = dbutils.fs.ls(customers_location)
  
# Calling DataFrame constructor on list
before_merge_df = pd.DataFrame(lst)
before_merge_df = before_merge_df.loc[before_merge_df["name"] != "_delta_log/"]
before_merge_df



Unnamed: 0,path,name,size,modificationTime
1,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00000-317715a4-05d3-4860-a86d-d45777b67d8...,9768340,1675461002000
2,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00001-b3dff13c-1b7b-4efc-bca6-efd43d68f72...,9764235,1675461002000
3,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00002-565492ef-4119-41f8-81f6-9117bcf120b...,9765205,1675461003000
4,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00003-726bdb90-6c6e-4fc6-a386-8bd59a80d46...,9770721,1675461002000
5,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00004-d9bc23bc-098d-4efa-a263-f421aa6135c...,9766729,1675461003000
6,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00005-7ba315e2-8912-47e9-9c09-b838abf6a79...,9764446,1675461003000
7,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00006-8e07fa9b-ef87-4204-b494-38223d54cd6...,9769161,1675461002000
8,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00007-9bd537a6-dee6-4499-9716-37fdeabdb7c...,9768569,1675461002000
9,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00008-acc91699-6692-4f0e-a7ef-f4298d7a27c...,9767624,1675461003000
10,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00009-decf9648-ea01-4732-a398-c61f5c51602...,9768866,1675461002000


In [0]:
import dbldatagen as dg
import pyspark.sql.functions as F

start_of_new_ids = df1.select(F.max('customer_id')+1).collect()[0][0]

print(start_of_new_ids)

df1_inserts = (dataspec.clone()
        .option("startingId", start_of_new_ids)
        .withRowCount(round(change_set_size * 0.2))
        .build()
        .withColumn("memo", F.lit("insert"))
        .withColumn("customer_id", F.expr(f"customer_id + {start_of_new_ids}"))
              )

# read the written data - if we simply recompute, timestamps of original will be lost
df_original = spark.read.format("delta").load(customers_location)

df1_updates = (df_original.sample(False, 0.1)
        .limit(round(change_set_size * 0.8))
        .withColumn("alias", F.lit('modified alias'))
        .withColumn("modified_ts",F.expr('now()'))
        .withColumn("memo", F.lit("update")))

df_changes = df1_inserts.union(df1_updates)

# randomize ordering
df_changes = (df_changes.withColumn("order_rand", F.expr("rand()"))
              .orderBy("order_rand")
              .drop("order_rand")
              )


10000001


In [0]:
df_changes.createOrReplaceTempView("change_set")

In [0]:
%sql

SELECT *
FROM change_set
LIMIT 10

customer_id,name,alias,payment_instrument_type,payment_instrument,email,customer_notes,created_ts,modified_ts,memo
6007812,dolore labore,modified alias,American Express,**** ****** *7504,eu-ut@in,Fugiat amet.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
10000994,do v. magna,eu dolore,branded visa,**** ****** *9092,commodo-nulla@excepteur,Velit aute dolore ut.,2023-02-03T21:51:06.083+0000,2023-02-03T21:51:06.083+0000,insert
6018353,in q. qui,modified alias,American Express,**** ****** *9866,sint.incididunt@dolore.com,Irure excepteur occaecat incididunt do sit nisi dolore.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
5942867,exercitation nostrud,modified alias,American Express,**** ****** *5605,minim-in@deserunt,Qui pariatur.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
6016454,laboris f. nulla,modified alias,discover,**** ****** *5300,sit.ad@nostrud.com,Ipsum non deserunt laborum do do excepteur.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
10001154,eiusmod qui,non eiusmod,paypal,**** ****** *904,ut-ad@est,Lorem culpa sit officia minim laboris voluptate sit.,2023-02-03T21:51:06.083+0000,2023-02-03T21:51:06.083+0000,insert
5951088,eiusmod s. aliqua,modified alias,discover,**** ****** *9212,tempor-enim@magna,Eiusmod quis duis labore reprehenderit quis fugiat.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
5976462,qui qui,modified alias,Mastercard,**** ****** *3598,sed.esse@culpa.com,Ut dolore dolor exercitation non magna id.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
5945741,ipsum enim,modified alias,American Express,**** ****** *5894,sit.id@adipiscing.com,Aute tempor sed eiusmod laborum duis culpa officia.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:06.083+0000,update
10000082,fugiat i. eiusmod,laborum g. dolor,discover,**** ****** *8164,labore-culpa@reprehenderit,Enim proident ipsum cupidatat eiusmod.,2023-02-03T21:51:06.083+0000,2023-02-03T21:51:06.083+0000,insert


In [0]:
%sql

SELECT memo, count(memo)
FROM change_set
GROUP BY memo

memo,count(memo)
insert,2000
update,8000


In [0]:
df_changes.dropDuplicates(["customer_id"]).createOrReplaceTempView("customers1_changes")
sqlStmt = dataspec.scriptMerge(tgtName="customers", srcName="changes",
                               joinExpr="src.customer_id=tgt.customer_id",
                               updateColumns=["alias", "memo","modified_ts"],
                               updateColumnExprs=[ ("memo", "'updated on merge'"),
                                                   ("modified_ts", "now()")
                                                 ])

print(sqlStmt)

MERGE INTO `customers` as tgt
USING `changes` as src
ON src.customer_id=tgt.customer_id
WHEN MATCHED THEN UPDATE  SET alias=src.alias, memo='updated on merge', modified_ts=now()
WHEN NOT MATCHED THEN INSERT (customer_id,name,alias,payment_instrument_type,payment_instrument,email,customer_notes,created_ts,modified_ts,memo) VALUES (src.customer_id, src.name, src.alias, src.payment_instrument_type, src.payment_instrument, src.email, src.customer_notes, src.created_ts, src.modified_ts, src.memo)


In [0]:
%sql

MERGE INTO `customers` as tgt
  USING `change_set` as src
  ON src.customer_id=tgt.customer_id
  WHEN MATCHED THEN 
    UPDATE SET alias=src.alias, memo='updated on merge', modified_ts=now()
  WHEN NOT MATCHED THEN 
    INSERT (customer_id,name,alias,payment_instrument_type,payment_instrument,email,customer_notes,created_ts,modified_ts,memo) VALUES (src.customer_id, src.name, src.alias, src.payment_instrument_type, src.payment_instrument, src.email, src.customer_notes, src.created_ts, src.modified_ts, src.memo)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
10000,8000,0,2000


For a small change data set relative to your source files, the change set is isolated to a single smaller file.  This allows the ordering and previous files to stay the same.  No major shuffling of pre-existing files. 

For a larger change data set, there is more shuffling and files that are added.  May require an OPTIMIZE command to be executed.

In [0]:
# list of strings
lst = dbutils.fs.ls(customers_location)
  
# Calling DataFrame constructor on list
after_merge_df = pd.DataFrame(lst)
after_merge_df = after_merge_df.loc[after_merge_df["name"] != "_delta_log/"]
after_merge_df

Unnamed: 0,path,name,size,modificationTime
1,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00000-317715a4-05d3-4860-a86d-d45777b67d8...,9768340,1675461002000
2,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00000-9a3fbd2d-beac-47d4-90e6-589b3082ac7...,390536,1675461072000
3,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00000-d194fbbd-8c4b-4d2e-a5b0-1c35d06fd49...,9481674,1675461074000
4,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00001-b3dff13c-1b7b-4efc-bca6-efd43d68f72...,9764235,1675461002000
5,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00002-565492ef-4119-41f8-81f6-9117bcf120b...,9765205,1675461003000
6,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00003-726bdb90-6c6e-4fc6-a386-8bd59a80d46...,9770721,1675461002000
7,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00004-d9bc23bc-098d-4efa-a263-f421aa6135c...,9766729,1675461003000
8,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00005-7ba315e2-8912-47e9-9c09-b838abf6a79...,9764446,1675461003000
9,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00006-8e07fa9b-ef87-4204-b494-38223d54cd6...,9769161,1675461002000
10,dbfs:/tmp/dml_sample/merge/9504141379420501870...,part-00007-9bd537a6-dee6-4499-9716-37fdeabdb7c...,9768569,1675461002000


In [0]:
%sql

SELECT COUNT(*)
FROM CUSTOMERS


count(1)
10002000


In [0]:
%sql

SELECT *
FROM CUSTOMERS
WHERE memo = "updated on merge"
LIMIT 10

customer_id,name,alias,payment_instrument_type,payment_instrument,email,customer_notes,created_ts,modified_ts,memo
6006826,in r. voluptate,modified alias,American Express,**** ****** *3870,anim.fugiat@aliquip.com,Consequat aliquip.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
6017639,est s. consequat,modified alias,Mastercard,**** ****** *6000,ipsum-eu@officia,Excepteur minim minim dolor commodo proident in esse.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
5943030,cupidatat veniam,modified alias,discover,**** ****** *1703,culpa-nisi@ad,Esse exercitation nisi labore.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
6015737,tempor do,modified alias,Mastercard,**** ****** *633,irure-exercitation@velit,Ullamco cillum et.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
5950731,ullamco amet,modified alias,American Express,**** ****** *6316,irure.magna@exercitation.com,Pariatur irure cupidatat voluptate laboris.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
5976425,ad cillum,modified alias,Visa,**** ****** *4426,proident.reprehenderit@reprehenderit.com,Ad est fugiat exercitation ullamco.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
5945660,laborum ut,modified alias,American Express,**** ****** *3948,pariatur-velit@ut,Veniam non in.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
5975109,aute e. consectetur,modified alias,discover,**** ****** *543,aliqua-amet@eiusmod,Sed aliqua in sit aute veniam sint aute.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
6010710,lorem dolor,modified alias,American Express,**** ****** *3403,sunt-tempor@quis,Eu culpa velit.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge
6001394,cupidatat d. dolore,modified alias,discover,**** ****** *2580,sint-nostrud@irure,Ad.,2023-02-03T21:49:31.462+0000,2023-02-03T21:51:08.381+0000,updated on merge


In [0]:
%sql 

DROP TABLE customers

In [0]:
dbutils.fs.rm(customers_location, True)

Out[10]: True