In [1]:
from pyspark.sql import Row
l = [(1001,'Apple Mac Book', 11, 'I'),(1002,'Apple iPhone 11', 22, 'I'),(1003,'Redmi Note 4', 33, 'I'),(1004, 'Dell Inspiron', 44, 'I')]
rdd = sc.parallelize(l)
schemaRdd = rdd.map(lambda x: Row(product_id=int(x[0]), product_name=x[1], stocks_left=int(x[2]), row_flag=x[3]))
product_df = spark.createDataFrame(schemaRdd)
display(product_df)

product_id,product_name,row_flag,stocks_left
1001,Apple Mac Book,I,11
1002,Apple iPhone 11,I,22
1003,Redmi Note 4,I,33
1004,Dell Inspiron,I,44


In [2]:
database_name = 'delta_demo'
delta_table_name = 'product_table_delta'
parquet_table_name = 'product_table_parquet'

In [3]:
(product_df
  .repartition(2)
  .write
  .mode("overwrite")
  .format("parquet")
  .saveAsTable(f'{database_name}.{parquet_table_name}')
)

(product_df
  .repartition(2)
  .write
  .mode("overwrite")
  .option("overwriteSchema", True)
  .format("delta")
  .saveAsTable(f'{database_name}.{delta_table_name}')
)

In [4]:
%sql

desc formatted delta_demo.product_table_delta

col_name,data_type,comment
product_id,bigint,
product_name,string,
row_flag,string,
stocks_left,bigint,
,,
# Detailed Table Information,,
Database,delta_demo,
Table,product_table_delta,
Owner,root,
Created Time,Sun Jun 28 07:40:09 UTC 2020,


In [5]:
%fs

ls dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log

path,name,size
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-0,.s3-optimization-0,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-1,.s3-optimization-1,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-2,.s3-optimization-2,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000000.crc,00000000000000000000.crc,89
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000000.json,00000000000000000000.json,3287
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000001.crc,00000000000000000001.crc,89
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000001.json,00000000000000000001.json,3414
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000002.crc,00000000000000000002.crc,91
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000002.json,00000000000000000002.json,2700
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000003.crc,00000000000000000003.crc,91


In [6]:
sqlCmd = f"SELECT * FROM {database_name}.{parquet_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left
1002,Apple iPhone 11,I,22
1001,Apple Mac Book,I,11
1004,Dell Inspiron,I,44
1003,Redmi Note 4,I,33


In [7]:
sqlCmd = f"SELECT * FROM {database_name}.{delta_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left
1002,Apple iPhone 11,I,22
1001,Apple Mac Book,I,11
1004,Dell Inspiron,I,44
1003,Redmi Note 4,I,33


In [8]:
l = [(1001,'Apple Mac Book', 'eleven', 'I'),(1002,'Apple iPhone 11', 22.52, 'I'),(1003,'Redmi Note 4', 33, 'I'),(1004, 'Dell Inspiron', 44, 'I')]
rdd = sc.parallelize(l)
schemaRdd = rdd.map(lambda x: Row(product_id=int(x[0]), product_name=x[1], stocks_left=str(x[2]), row_flag=x[3]))
type_changed_product_df = spark.createDataFrame(schemaRdd)
display(type_changed_product_df)

product_id,product_name,row_flag,stocks_left
1001,Apple Mac Book,I,eleven
1002,Apple iPhone 11,I,22.52
1003,Redmi Note 4,I,33
1004,Dell Inspiron,I,44


In [9]:
(type_changed_product_df
  .repartition(2)
  .write
  .mode("append")
  .format("parquet")
  .saveAsTable(f'{database_name}.{parquet_table_name}')
)

In [10]:
(type_changed_product_df
  .repartition(2)
  .write
  .mode("append")
  .format("delta")
  .saveAsTable(f'{database_name}.{delta_table_name}')
)

In [11]:
sqlCmd = f"SELECT * FROM {database_name}.{delta_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left
1002,Apple iPhone 11,I,22
1002,Apple iPhone 11,I,22
1001,Apple Mac Book,I,11
1001,Apple Mac Book,I,11
1004,Dell Inspiron,I,44
1004,Dell Inspiron,I,44
1003,Redmi Note 4,I,33
1003,Redmi Note 4,I,33


In [12]:
sqlCmd = f"SELECT * FROM {database_name}.{parquet_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1001,Apple Mac Book,I,11.0
1001,Apple Mac Book,I,11.0
1001,Apple Mac Book,I,11.0
1001,Apple Mac Book,I,11.0
1004,Dell Inspiron,I,44.0


In [15]:
l = [(1001,'Apple Mac Book', 11, 'I', 'new_1'),(1002,'Apple iPhone 11', 22, 'I', 'new_2'),(1003,'Redmi Note 4', 33, 'I', 'new_3'),(1004, 'Dell Inspiron', 44, 'I', 'new_4')]
rdd = sc.parallelize(l)
schemaRdd = rdd.map(lambda x: Row(product_id=int(x[0]), product_name=x[1], stocks_left=int(x[2]), row_flag=x[3], new_col=x[4]))
new_col_product_df = spark.createDataFrame(schemaRdd)
display(new_col_product_df)

new_col,product_id,product_name,row_flag,stocks_left
new_1,1001,Apple Mac Book,I,11
new_2,1002,Apple iPhone 11,I,22
new_3,1003,Redmi Note 4,I,33
new_4,1004,Dell Inspiron,I,44


In [16]:
new_col_product_df.printSchema()

In [17]:
(new_col_product_df
  .repartition(2)
  .write
  .mode("append")
  .format("parquet")
  .saveAsTable(f'{database_name}.{parquet_table_name}')
)

In [18]:
(new_col_product_df
  .repartition(2)
  .write
  .mode("append")
  .format("delta")
  .saveAsTable(f'{database_name}.{delta_table_name}')
)

In [19]:
(new_col_product_df
  .repartition(2)
  .write
  .mode("append")
  .option("mergeSchema", True)
  .format("delta")
  .saveAsTable(f'{database_name}.{delta_table_name}')
)

In [20]:
sqlCmd = f"SELECT * FROM {database_name}.{parquet_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1002,Apple iPhone 11,I,22.0
1001,Apple Mac Book,I,11.0
1001,Apple Mac Book,I,11.0
1001,Apple Mac Book,I,11.0
1001,Apple Mac Book,I,11.0
1004,Dell Inspiron,I,44.0


In [21]:
sqlCmd = f"SELECT * FROM {database_name}.{delta_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left,new_col
1002,Apple iPhone 11,I,22,new_2
1001,Apple Mac Book,I,11,new_1
1004,Dell Inspiron,I,44,new_4
1003,Redmi Note 4,I,33,new_3
1002,Apple iPhone 11,I,22,
1001,Apple Mac Book,I,11,
1004,Dell Inspiron,I,44,
1003,Redmi Note 4,I,33,


### Changing a column’s type or name or dropping a column requires rewriting the table. To do this, use the overwriteSchema option:

# CDC USE CASE - DML/DDL

In [29]:
l = [(1001,'Apple Mac Book', 110, 'U', 'new_1'),(1002,'Apple iPhone 11', 2, 'U', 'new_2'),(1003,'Redmi Note 4', 3, 'U', 'new_3'),(1004, 'Dell Inspiron', 0, 'D', 'new_4')]
rdd = sc.parallelize(l)
schemaRdd = rdd.map(lambda x: Row(product_id=int(x[0]), product_name=x[1], stocks_left=int(x[2]), row_flag=x[3], new_col=x[4]))
merge_product_df = spark.createDataFrame(schemaRdd)
display(merge_product_df)

new_col,product_id,product_name,row_flag,stocks_left
new_1,1001,Apple Mac Book,U,110
new_2,1002,Apple iPhone 11,U,2
new_3,1003,Redmi Note 4,U,3
new_4,1004,Dell Inspiron,D,0


In [30]:
merge_product_df.createOrReplaceTempView('source_table')

In [32]:
spark.sql("""
          MERGE INTO delta_demo.product_table_delta target_table
          USING source_table
          ON source_table.product_id = target_table.product_id
          WHEN MATCHED AND source_table.row_flag = 'U'
          THEN
          UPDATE SET *
          WHEN MATCHED AND source_table.row_flag = 'D'
          THEN DELETE
          WHEN NOT MATCHED and source_table.row_flag = 'I'
          THEN INSERT *
          """)

In [33]:
sqlCmd = f"SELECT * FROM {database_name}.{delta_table_name}"
display(spark.sql(sqlCmd))

product_id,product_name,row_flag,stocks_left,new_col
1002,Apple iPhone 11,U,2,new_2
1002,Apple iPhone 11,U,2,new_2
1001,Apple Mac Book,U,110,new_1
1001,Apple Mac Book,U,110,new_1
1003,Redmi Note 4,U,3,new_3
1003,Redmi Note 4,U,3,new_3


# SHOW I/U/D counts, this features was not there previously. If possible also show the slack channel screenshot

In [36]:
%sql

describe history delta_demo.product_table_delta

-- Table history is retained for 30 days

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
7,2020-06-28T10:52:13.000+0000,1583409609571278,techieeisland@gmail.com,MERGE,"Map(predicate -> (source_table.`product_id` = target_table.`product_id`), updatePredicate -> (source_table.`row_flag` = 'U'), deletePredicate -> (source_table.`row_flag` = 'D'), insertPredicate -> (source_table.`row_flag` = 'I'))",,List(891795827403111),0628-102648-pick544,6.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 2, numFiles -> 4, numTargetFilesAfterSkipping -> 10, numTargetFilesAdded -> 4, numTargetRowsInserted -> 0, numTargetRowsUpdated -> 6, numOutputRows -> 6, numParts -> 0, numOutputBytes -> 5583, numSourceRows -> 4, numTargetFilesRemoved -> 8, numTargetFilesBeforeSkipping -> 10)"
6,2020-06-28T10:41:40.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(891795827403111),0628-102648-pick544,5.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 6618, numOutputRows -> 4, numParts -> 0)"
5,2020-06-28T10:38:26.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(891795827403111),0628-102648-pick544,4.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 5602, numOutputRows -> 4, numParts -> 0)"
4,2020-06-28T10:36:48.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(891795827403111),0628-102648-pick544,3.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 5602, numOutputRows -> 4, numParts -> 0)"
3,2020-06-28T08:17:46.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(891795827403111),0628-070251-hove428,2.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 6618, numOutputRows -> 4, numParts -> 0)"
2,2020-06-28T08:00:00.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(891795827403111),0628-070251-hove428,1.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 5602, numOutputRows -> 4, numParts -> 0)"
1,2020-06-28T07:44:11.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(891795827403111),0628-070251-hove428,0.0,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 5602, numOutputRows -> 4, numParts -> 0)"
0,2020-06-28T07:40:12.000+0000,1583409609571278,techieeisland@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(891795827403111),0628-070251-hove428,,WriteSerializable,False,"Map(numFiles -> 5, numOutputBytes -> 5602, numOutputRows -> 4, numParts -> 0)"


In [37]:
%sql

select operationMetrics.numTargetRowsInserted, operationMetrics.numTargetRowsUpdated, operationMetrics.numTargetRowsDeleted from (describe history delta_demo.product_table_delta limit 1)

numTargetRowsInserted,numTargetRowsUpdated,numTargetRowsDeleted
0,6,2


# TIME TRAVEL

In [41]:
%sql

select * from delta_demo.product_table_delta version as of 1

product_id,product_name,row_flag,stocks_left
1002,Apple iPhone 11,I,22
1001,Apple Mac Book,I,11
1004,Dell Inspiron,I,44
1003,Redmi Note 4,I,33


# SHOW TRANSACTIONAL LOG FILES

In [45]:
%fs

ls dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log

path,name,size
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-0,.s3-optimization-0,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-1,.s3-optimization-1,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-2,.s3-optimization-2,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000000.crc,00000000000000000000.crc,89
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000000.json,00000000000000000000.json,3287
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000001.crc,00000000000000000001.crc,89
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000001.json,00000000000000000001.json,3414
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000002.crc,00000000000000000002.crc,91
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000002.json,00000000000000000002.json,2700
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000003.crc,00000000000000000003.crc,91


In [46]:
%fs

ls dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/

path,name,size
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-0,.s3-optimization-0,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-1,.s3-optimization-1,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/.s3-optimization-2,.s3-optimization-2,0
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000000.crc,00000000000000000000.crc,89
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000000.json,00000000000000000000.json,3287
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000001.crc,00000000000000000001.crc,89
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000001.json,00000000000000000001.json,3414
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000002.crc,00000000000000000002.crc,91
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000002.json,00000000000000000002.json,2700
dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000003.crc,00000000000000000003.crc,91


In [47]:
%fs

head dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000006.json

In [48]:
display(spark.read.json("dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000006.json"))

add,commitInfo,metaData
,"List(0628-102648-pick544, true, WriteSerializable, List(891795827403111), WRITE, List(5, 6618, 4, 0), List(Append, []), 5, 1593340899589, 1583409609571278, techieeisland@gmail.com)",
,,"List(1593330009451, List(parquet), 59535559-84e1-46da-a566-06c753305e5a, List(), {""type"":""struct"",""fields"":[{""name"":""product_id"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""product_name"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""row_flag"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""stocks_left"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""new_col"",""type"":""string"",""nullable"":true,""metadata"":{}}]})"
"List(true, 1593340900000, part-00000-1baa5b2e-f525-45d6-9ea4-ae79ba1bdceb-c000.snappy.parquet, 645, {""numRecords"":0,""minValues"":{},""maxValues"":{},""nullCount"":{}})",,
"List(true, 1593340900000, part-00001-d26756df-406f-4db0-a66f-ab5ccf574d96-c000.snappy.parquet, 1498, {""numRecords"":1,""minValues"":{""new_col"":""new_1"",""product_id"":1001,""product_name"":""Apple Mac Book"",""row_flag"":""I"",""stocks_left"":11},""maxValues"":{""new_col"":""new_1"",""product_id"":1001,""product_name"":""Apple Mac Book"",""row_flag"":""I"",""stocks_left"":11},""nullCount"":{""new_col"":0,""product_id"":0,""product_name"":0,""row_flag"":0,""stocks_left"":0}})",,
"List(true, 1593340900000, part-00003-a9cd7ebe-f8b6-4f0c-9344-997cab24eb3b-c000.snappy.parquet, 1507, {""numRecords"":1,""minValues"":{""new_col"":""new_2"",""product_id"":1002,""product_name"":""Apple iPhone 11"",""row_flag"":""I"",""stocks_left"":22},""maxValues"":{""new_col"":""new_2"",""product_id"":1002,""product_name"":""Apple iPhone 11"",""row_flag"":""I"",""stocks_left"":22},""nullCount"":{""new_col"":0,""product_id"":0,""product_name"":0,""row_flag"":0,""stocks_left"":0}})",,
"List(true, 1593340900000, part-00005-db28f365-bae8-43c1-89ba-85bf77fc7932-c000.snappy.parquet, 1480, {""numRecords"":1,""minValues"":{""new_col"":""new_3"",""product_id"":1003,""product_name"":""Redmi Note 4"",""row_flag"":""I"",""stocks_left"":33},""maxValues"":{""new_col"":""new_3"",""product_id"":1003,""product_name"":""Redmi Note 4"",""row_flag"":""I"",""stocks_left"":33},""nullCount"":{""new_col"":0,""product_id"":0,""product_name"":0,""row_flag"":0,""stocks_left"":0}})",,
"List(true, 1593340900000, part-00007-7ad2390f-b48f-422c-96e6-728bbc893028-c000.snappy.parquet, 1489, {""numRecords"":1,""minValues"":{""new_col"":""new_4"",""product_id"":1004,""product_name"":""Dell Inspiron"",""row_flag"":""I"",""stocks_left"":44},""maxValues"":{""new_col"":""new_4"",""product_id"":1004,""product_name"":""Dell Inspiron"",""row_flag"":""I"",""stocks_left"":44},""nullCount"":{""new_col"":0,""product_id"":0,""product_name"":0,""row_flag"":0,""stocks_left"":0}})",,


In [49]:
display(spark.read.json("dbfs:/user/hive/warehouse/delta_demo.db/product_table_delta/_delta_log/00000000000000000007.json"))

add,commitInfo,remove
,"List(0628-102648-pick544, false, WriteSerializable, List(891795827403111), MERGE, List(4, 5583, 6, 0, 4, 4, 10, 10, 8, 0, 2, 0, 6), List((source_table.`row_flag` = 'D'), (source_table.`row_flag` = 'I'), (source_table.`product_id` = target_table.`product_id`), (source_table.`row_flag` = 'U')), 6, 1593341532506, 1583409609571278, techieeisland@gmail.com)",
,,"List(true, 1593341532454, part-00005-db28f365-bae8-43c1-89ba-85bf77fc7932-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00007-7ad2390f-b48f-422c-96e6-728bbc893028-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00001-d26756df-406f-4db0-a66f-ab5ccf574d96-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00003-f5307a9d-8be6-42d9-a9af-0b276c81e1d0-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00007-88bcdf6d-cb6b-4013-adaf-d63acc344dc8-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00003-a9cd7ebe-f8b6-4f0c-9344-997cab24eb3b-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00005-8b1e9299-6d7d-4e81-8650-d7b3b6d72626-c000.snappy.parquet)"
,,"List(true, 1593341532502, part-00001-60eb97be-368d-4dca-9643-191e93e04c93-c000.snappy.parquet)"
"List(true, 1593341529000, part-00000-dabb379d-646b-416d-b4f7-e2840c0a096b-c000.snappy.parquet, 645, {""numRecords"":0,""minValues"":{},""maxValues"":{},""nullCount"":{}})",,


### Explain about scalable metadata handling