### Pyspark Merge Statement

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField('emp_id', IntegerType(), True),
    StructField('emp_name', StringType(), True),
    StructField('city', StringType(), True),
    StructField('country', StringType(), True),
    StructField('contact_no', IntegerType(), True)
])


In [0]:
data = [(1000, 'Michael', 'Columbus', 'USA', 689546323)]
df = spark.createDataFrame(data = data, schema = schema)

display(df)

In [0]:
%sql
CREATE OR REPLACE TABLE dim_employee(
   emp_id INT,
  emp_name STRING,
  city STRING,
  country STRING,
  contact_no INT
)
USING DELTA

In [0]:
%sql
select * from dim_employee

### Method 1 - Spark SQL

In [0]:
df.createOrReplaceTempView("source_View")

In [0]:
%sql
select * from source_view

In [0]:
%sql
select * from dim_employee

In [0]:
%sql
MERGE INTO dim_employee as target
USING source_view as source
 ON target.emp_id = source.emp_id
 WHEN MATCHED
THEN UPDATE SET
  target.emp_name = source.emp_name,
  target.city = source.city,
  target.country = source.country,
  target.contact_no = source.contact_no
WHEN NOT MATCHED THEN
INSERT(emp_id, emp_name, city, country, contact_no) VALUES(emp_id, emp_name, city, country, contact_no)


  

In [0]:
%sql
select * from dim_employee

In [0]:
data = [(1000, 'Michael', 'Chicago', 'USA', 689546323), (2000, 'Nancy', 'New York', 'USA', 76345902)]
df = spark.createDataFrame(data = data, schema=schema)
display(df)

In [0]:
df.createOrReplaceTempView("source_view")

In [0]:
%sql
select * from source_view

In [0]:
%sql
select * from dim_employee

In [0]:
%sql
MERGE INTO dim_employee as target
USING source_view as source
 ON target.emp_id = source.emp_id
 WHEN MATCHED
THEN UPDATE SET
  target.emp_name = source.emp_name,
  target.city = source.city,
  target.country = source.country,
  target.contact_no = source.contact_no
WHEN NOT MATCHED THEN
INSERT(emp_id, emp_name, city, country, contact_no) VALUES(emp_id, emp_name, city, country, contact_no)

In [0]:
%sql
select * from dim_employee

### Method 2 - Pyspark

In [0]:
data = [(2000, 'Sarah', 'New York', 'USA', 76345902), (3000, 'Robert', 'Chicago', 'USA', 76345902)]
df = spark.createDataFrame(data = data, schema=schema)
display(df)

In [0]:
# Load the target Delta table
target_table = DeltaTable.forName(
    spark,
    "workspace.default.dim_employee"
)

In [0]:
target_table.alias("target").merge(
    source = df.alias("source"),
    condition = "target.emp_id = source.emp_id"
).whenMatchedUpdate(
    set = {
        "emp_name": "source.emp_name",
        "city": "source.city",
        "country": "source.country",
        "contact_no": "source.contact_no"
    }
).whenNotMatchedInsert(
    values = {
        "emp_id": "source.emp_id",
        "emp_name": "source.emp_name",
        "city": "source.city",
        "country": "source.country",
        "contact_no": "source.contact_no"
    }
).execute()

In [0]:
%sql
select * from workspace.default.dim_employee

In [0]:
display(df)

### Lecture 56 - Create Audit Log

In [0]:
%sql
create table audit_log(operation string,
  updated_time timestamp,
  user_name string,
  notebook_name string,
  numTargetRowsUpdated int,
  numTargetRowsInserted int,
  numTargetRowsDeleted int
)

In [0]:
%sql
select * from audit_log

### Create Dataframe with Last Operation in Delta Table

In [0]:
from delta.tables import *
delta_df = DeltaTable.forName(
    spark,
    "workspace.default.dim_employee"
)

lastOperationDF = delta_df.history(1) # get the last operation
display(lastOperationDF)

In [0]:
display(delta_df.history(3))

### Explode Operation Metrics Column

In [0]:
explode_df = lastOperationDF.select(lastOperationDF.operation, explode(lastOperationDF.operationMetrics))
explode_df_select = explode_df.select(explode_df.operation, explode_df.key, explode_df.value.cast('int'))

display(explode_df)
display(explode_df_select)

### Pivot Operation to Convert Rows to Columns

In [0]:
pivot_df = explode_df_select.groupBy("operation").pivot("key").sum("value")
display(pivot_df)

### Select Only Columns Needed for Audit Log Table

In [0]:
pivot_df_select = pivot_df.select(pivot_df.operation, pivot_df.numTargetRowsInserted, pivot_df.numTargetRowsUpdated, pivot_df.numTargetRowsDeleted)
display(pivot_df_select)

### Add Notebook Parameters such as UserName, Notebook Path etc.

In [0]:
auditDF = pivot_df_select.withColumn('user_name', lit(dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get())).withColumn('notebook_name', lit(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())).withColumn('updated_time', lit(current_timestamp()))
                                                                                                        
display(auditDF)

### Re-arragne Columns in Dataframe to Match it with Audit Log Table

In [0]:
auditDF_select = auditDF.select(auditDF.operation, auditDF.updated_time, auditDF.user_name, auditDF.notebook_name, auditDF.numTargetRowsUpdated, auditDF.numTargetRowsInserted, auditDF.numTargetRowsDeleted)

display(auditDF_select)

### Create Temp View on Dataframe

In [0]:
auditDF_select.createOrReplaceTempView("audit")

In [0]:
%sql
select * from audit

In [0]:
%sql
select * from audit_log

### Inset Audit Data into Audit Log Table

In [0]:
%sql
insert into audit_log
select * from audit

In [0]:
%sql
select * from audit_log