In [1]:
tableName = "customers"

In [2]:
basePath = "file:///tmp/customers"

In [3]:
data = [(1, "old address for 1","durham", '2020-01-01 02:34:54'),
  (1, "current address for 1","durham", '2020-01-02 02:27:34'),
  (2, "current address for 2","cary", '2020-01-01 07:34:53'),
  (3, "current address for 3","chapel hil", '2020-01-01 04:26:15')]

In [4]:
full = spark.createDataFrame(data,schema=['customerId','address','city','createdOn'])

In [5]:
full.show()

In [6]:
pkey = "customerId"

In [7]:
definingCol = "createdOn"

In [8]:
dataKeys = ["address", "city"]

In [9]:
hashKeyCols = ["customerId"]

In [10]:
hashDataCols = dataKeys

In [11]:
def processData(df) :
        windowBy = Window.partitionBy(col(pkey)).orderBy(col(definingCol))
        df = (
            df.withColumn(
                "hashKey",
                sha2(
                    concat_ws("|", *map(lambda key_cols: col(key_cols), hashKeyCols)), 256
                ),
            )
            .withColumn(
                "hashData",
                sha2(
                    concat_ws("|", *map(lambda key_cols: col(key_cols), hashDataCols)), 256
                ),
            )
            .withColumn("startDate", col(definingCol))
            .withColumn("endDate", lead(col(definingCol)).over(windowBy))
        )
        return df

In [12]:
df = processData(df)

In [13]:
df = processData(full)

In [14]:
from pyspark.sql import Window

In [15]:
from psypark.sql.functions import *

In [16]:
from pyspark.sql.functions import *

In [17]:
from pyspark.sql.types import *

In [18]:
df = processData(full)

In [19]:
df.show9)

In [20]:
df.show()

In [21]:
hudi_options_upsert = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'hashKey',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'upsert',
  'hoodie.datasource.write.precombine.field': 'createdOn',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

In [22]:
hudi_options_upsert_multi= {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'hashKey,hashData',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'upsert',
  'hoodie.datasource.write.precombine.field': 'createdOn',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

In [23]:
df.write.format("hudi"). \
  options(**hudi_options_upsert). \
  mode("overwrite"). \
  save(basePath+'/upsert')

In [24]:
df.write.format("hudi"). \
  options(**hudi_options_upsert_multi). \
  mode("overwrite"). \
  save(basePath+'/upsert_multi')

In [25]:
hudi_options_insert = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'hashKey',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'insert',
  'hoodie.datasource.write.precombine.field': 'createdOn',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

In [26]:
df.write.format("hudi"). \
  options(**hudi_options_insert). \
  mode("overwrite"). \
  save(basePath+'/insert')

In [27]:
up1 = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/upsert/default")

In [28]:
up1.show()

In [29]:
up2 = spark. \
  read. \
  format("hudi"). \
  option('hoodie.datasource.query.type','incremental')

In [30]:
up2.show()

In [31]:
up2 = spark. \
  read. \
  format("hudi"). \
  option('hoodie.datasource.query.type','incremental')

In [32]:
up2 = spark. \
  read. \
  format("hudi"). \
  option('hoodie.datasource.query.type','incremental'). \
  load(basePath + "/upsert/default")

In [33]:
up2 = spark. \
  read. \
  format("hudi"). \
  option('hoodie.datasource.query.type','incremental'). \
  option('hoodie.datasource.read.begin.instanttime','20170901080000')

In [34]:
up2 = spark. \
  read. \
  format("hudi"). \
  option('hoodie.datasource.query.type','incremental'). \
  option('hoodie.datasource.read.begin.instanttime','20170901080000'). \
  load(basePath + "/upsert/default")

In [35]:
up2.show()

In [36]:
ip = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/upsert/insert")

In [37]:
ip = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/insert/default")

In [38]:
ip.show()

In [39]:
incData = [(1, "new address for 1","durham", "2018-03-03 02:34:65"),
  (3, "current address for 3 update","apex", "2018-04-04 09:32:23"),    
  (4, "new address for 4","Raleigh", "2018-04-04 04:49:22")]

In [40]:
inc =  spark.createDataFrame(incData,schema=['customerId','address',"city",'createdOn'])

In [41]:
incDF = processData(inc)

In [42]:
incDF.show()

In [43]:
incDF.write.format("hudi"). \
  options(**hudi_options_upsert). \
  mode("append"). \
  save(basePath+'/upsert')

In [44]:
incDF.write.format("hudi"). \
  options(**hudi_options_upsert). \
  mode("append"). \
  save(basePath+'/insert')

In [45]:
up = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/upsert/default")

In [46]:
ip = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/insert/default")

In [47]:
up.show()

In [48]:
ip.show()