In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# **Create Delta Table**

## using sql api

In [0]:
%sql
create table delta.deltaschema.deltatable
(
  id int not null,
  name string,
  age int
)
using delta

## insert values

In [0]:
%sql
insert into delta.deltaschema.deltatable(id,name,age) values
(1,"suresh",25),
(2,"kumar",30),
(3,"ram",35),
(4,"murugan",40)

## read the data

In [0]:
%sql
select * from delta.deltaschema.deltatable

## create dataframe

In [0]:
data = [(1, 'John',25),(2, 'Smith',30),(3, 'Sarah',35),(4, 'Mary',40),(5, 'Mike',45)]


schema = ['id', 'name', 'age']

df = spark.createDataFrame(data,schema)

display(df)

## write data

In [0]:
df.write.format("delta")\
    .mode("append")\
        .save("/Volumes/delta/deltaschema/sink")

## read delta data that means parquet file

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`

# **SCHEMA ENFORCEMENT**

In [0]:
data1 = [(1, 'John',25,"chennai"),(2, 'Smith',30,"salem"),(3, 'Sarah',35,"covai"),(4, 'Mary',40,"trichy"),(5, 'Mike',45,"ooty")]


schema1 = ['id','name','age','city']

df1 = spark.createDataFrame(data1,schema1)

display(df1)

## here i already append one data frame in /Volumes/delta/deltaschema/sink this location schema is ['id', 'name', 'age']  but now i add other dataframe with different schema ['id','name','age','city']  can't show error we use SCHEMA EVOLUTION

In [0]:
df1.write.format("delta")\
  .mode("append")\
    .save("/Volumes/delta/deltaschema/sink")

# **SCHEMA EVOLUTION**

In [0]:
df1.write.format("delta")\
    .mode("append")\
    .option("mergeSchema","true")\
    .save("/Volumes/delta/deltaschema/sink")

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`

# **SCHEMA OVERWRITE**

## here now i change schema name above add new schema in dataframe and write same location

In [0]:
data2 = [(1, 'John',25,"chennai"),(2, 'Smith',30,"salem"),(3, 'Sarah',35,"covai"),(4, 'Mary',40,"trichy"),(5, 'Mike',45,"ooty")]


schema2 = ['cust_id','cust_name','cust_age','cust_city']

df2 = spark.createDataFrame(data2,schema2)

display(df2)

In [0]:
df2.write.format("delta")\
    .mode("append")\
    .save("/Volumes/delta/deltaschema/sink")

In [0]:
df2.write.format("delta")\
    .mode("overwrite")\
    .save("/Volumes/delta/deltaschema/sink")

In [0]:
df2.write.format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema","true")\
    .save("/Volumes/delta/deltaschema/sink")

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`

# **DML**

### **update**

In [0]:
%sql
update delta.`/Volumes/delta/deltaschema/sink`
set cust_name = "john cena"
where cust_id = 1
    


In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`


# **UPSERT**

In [0]:
data3 = [(6, 'aaaaa',25,"chennai"),(2, 'Smithaaaaaa',30,"salem"),(7, 'bbbb',35,"covai"),(8, 'cccc',40,"trichy"),(5, 'Mikeeeeeeee',45,"ooty")]

schema3 = ['cust_id','cust_name','cust_age','cust_city']

df3 = spark.createDataFrame(data3,schema3)

display(df3)

In [0]:
from delta.tables import DeltaTable

In [0]:
dlt_obj = DeltaTable.forPath(spark,"/Volumes/delta/deltaschema/sink")


dlt_obj.alias("target").merge(df3.alias("source"), "target.cust_id = source.cust_id")\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`

# **TABLE UTILITY COMMANDS**

In [0]:
%sql
describe delta.deltaschema.deltatable

## volumes

In [0]:
%sql
describe delta.`/Volumes/delta/deltaschema/sink`

In [0]:
%sql
describe extended delta.deltaschema.deltatable

## volumes

In [0]:
%sql
describe extended delta.`/Volumes/delta/deltaschema/sink`

In [0]:
%sql
describe history delta.deltaschema.deltatable

In [0]:
%sql
describe history delta.`/Volumes/delta/deltaschema/sink`

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`
version as of 4

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`

In [0]:
%sql
restore delta.`/Volumes/delta/deltaschema/sink`
version as of 4

In [0]:
%sql
select * from delta.`/Volumes/delta/deltaschema/sink`

In [0]:
%sql
show tblproperties delta.deltaschema.deltatable

In [0]:
%sql
create table delta.deltaschema.deltatable2
clone delta.deltaschema.deltatable
    


In [0]:
data4 = [(1,"aaa","chennai"),(2,"bbb","salem"),(3,"ccc","covai"),(4,"ddd","trichy")]


schema4 = ['id','name','city']

df4 = spark.createDataFrame(data4,schema4)

display(df4)

In [0]:
df4.write.format("delta")\
    .mode("append")\
    .save("/Volumes/delta/deltaschema/optimization/optimize")


In [0]:
%sql
describe history delta.`/Volumes/delta/deltaschema/optimization/optimize/`

In [0]:
%sql
update delta.`/Volumes/delta/deltaschema/optimization/optimize/`
set name = 'updated'
where id = 1

In [0]:
%sql
describe history delta.`/Volumes/delta/deltaschema/optimization/optimize/`

## optimize

In [0]:
%sql
optimize delta.`/Volumes/delta/deltaschema/optimization/optimize/`

In [0]:
%sql
describe history delta.`/Volumes/delta/deltaschema/optimization/optimize/`

## zorder by

In [0]:
%sql

optimize delta.`/Volumes/delta/deltaschema/optimization/optimize/` zorder by (id)

In [0]:
%sql
describe history delta.`/Volumes/delta/deltaschema/optimization/optimize/`