# Introduction
> Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that do not match the table's schema.

**Reference**

https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html




# Prerequsites

1. Connecting to ADLS and Create DataFrame
2. Create DataFrame

## Loading data into DF

In [0]:
users_ext_loc = (
    spark.sql(" DESCRIBE EXTERNAL LOCATION `users_ext_loc`")
    .select("url")
    .collect()[0][0]
)
print(users_ext_loc)

In [0]:
file_names = "/FileStore/sandbox/users/{users_001.csv}"
print(file_names)

/FileStore/sandbox/users/{users_001.csv}


In [0]:
from pyspark.sql.types import *
users_ext_loc = "/FileStore/sandbox/users/"
file_names = "{users_001.csv}"
users_dataset_path = f"{users_ext_loc}/{file_names}"

user_df = spark.read.csv(
    path=f"{users_ext_loc}/{file_names}", header=True, inferSchema=True
)
print(f"Total No. of Records {user_df.count()}")

Total No. of Records 500


## Create Schema 

In [0]:
%sql
create database deltadb;


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4347642085200022>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-4347642085200022>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;

## Create Table and Load Data

In [0]:
user_df.write.format("delta").mode("append").saveAsTable("deltadb.user_info")

In [0]:
%sql

DESCRIBE FORMATTED deltadb.user_info;

col_name,data_type,comment
id,int,
name,string,
dob,date,
email,string,
gender,string,
country,string,
region,string,
city,string,
asset,int,
marital_status,string,


## Read Delta Table

In [0]:
spark.read.table("deltadb.user_info").limit(num=3).display()

id,name,dob,email,gender,country,region,city,asset,marital_status
1,Heather Gibbs,2024-10-31,heathergibbs6243@gmail.com,Female,United States,Virginia,Virginia Beach,734388,Married
2,Herrod Petersen,2024-02-19,herrodpetersen@yahoomail.com,Male,United States,Arizona,Phoenix,113506,Single
3,Ocean Workman,2024-10-10,oceanworkman2328@ymail.com,Male,United States,Tennessee,Clarksville,139985,Married


# Source with More Columns

## Read Data with more Columns

In [0]:
file_names = "{users_006_new_column_education.csv}"
users_dataset_path = f"{users_ext_loc}/{file_names}"
user_df_with_more_cols = spark.read.csv(
    path=users_dataset_path, header=True, inferSchema=True
)

user_df_with_more_cols.limit(5).display()

id,name,dob,email,gender,country,region,city,asset,marital_status,education
2501,Macy Holcomb,2024-10-20,macyholcomb@yahoomail.com,Transgender,India,Karnataka,Bidar,455582,Common Law,Bachelor's Degree
2502,Zeph Shepherd,2024-03-10,zephshepherd5778@ymail.com,Transgender,India,Dadra and Nagar Haveli,Silvassa,480168,Common Law,Bachelor's Degree
2503,Elton Stark,2025-03-23,eltonstark7778@yahoomail.com,Female,India,Tamil Nadu,Ambattur,428049,Single,Bachelor's Degree
2504,Alisa Cook,2025-03-19,alisacook4349@gmail.com,Male,United States,Maine,Bangor,594097,Common Law,Bachelor's Degree
2505,Boris Snyder,2024-08-03,borissnyder2540@yahoomail.com,Male,United States,Louisiana,Baton Rouge,977312,Divorced,Bachelor's Degree



## Write Data with more Columns

In [0]:
user_df_with_more_cols.write.format("delta").mode("append").saveAsTable(
    "deltadb.user_info"
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4347642085200031>:1[0m
[0;32m----> 1[0m [43muser_df_with_more_cols[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mappend[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msaveAsTable[49m[43m([49m
[1;32m      2[0m [43m    [49m[38;5;124;43m"[39;49m[38;5;124;43mdeltadb.user_info[39;49m[38;5;124;43m"[39;49m
[1;32m      3[0m [43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_c

## Solution

In [0]:
user_df_with_more_cols.write.format("delta").option("mergeSchema", "true").mode(
    "append"
).saveAsTable("deltadb.user_info")

spark.read.table("deltadb.user_info").limit(num=3).display()

id,name,dob,email,gender,country,region,city,asset,marital_status,education
1,Heather Gibbs,2024-10-31,heathergibbs6243@gmail.com,Female,United States,Virginia,Virginia Beach,734388,Married,
2,Herrod Petersen,2024-02-19,herrodpetersen@yahoomail.com,Male,United States,Arizona,Phoenix,113506,Single,
3,Ocean Workman,2024-10-10,oceanworkman2328@ymail.com,Male,United States,Tennessee,Clarksville,139985,Married,


In [0]:

from pyspark.sql.functions import col
spark.read.table("deltadb.user_info").filter(col("id")>2500).limit(num=3).display()

id,name,dob,email,gender,country,region,city,asset,marital_status,education
2501,Macy Holcomb,2024-10-20,macyholcomb@yahoomail.com,Transgender,India,Karnataka,Bidar,455582,Common Law,Bachelor's Degree
2502,Zeph Shepherd,2024-03-10,zephshepherd5778@ymail.com,Transgender,India,Dadra and Nagar Haveli,Silvassa,480168,Common Law,Bachelor's Degree
2503,Elton Stark,2025-03-23,eltonstark7778@yahoomail.com,Female,India,Tamil Nadu,Ambattur,428049,Single,Bachelor's Degree


# Source with Less Columns


## Read Data with Less Columns

In [0]:
file_names = "{users_012_less_columns.csv}"
users_dataset_path = f"{users_ext_loc}/{file_names}"


user_df_with_less_cols = spark.read.csv(
    path=users_dataset_path, header=True, inferSchema=True
)
user_df_with_less_cols.printSchema()


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)




## Write Data with Less Columns

In [0]:

user_df_with_less_cols.write.format("delta").mode("append").saveAsTable("deltadb.user_info")

## Read Delta Table

In [0]:
from pyspark.sql.functions import *
user_delta_df = spark.read.table("deltadb.user_info")
print(f"Total No. of Records: {user_delta_df.count()}")
display(user_delta_df.filter(col("id")>5000).show())

Total No. of Records: 1500
+----+--------------------+----+-----+------+-------+------+----+-----+--------------+---------+
|  id|                name| dob|email|gender|country|region|city|asset|marital_status|education|
+----+--------------------+----+-----+------+-------+------+----+-----+--------------+---------+
|5501|      Berk Rasmussen|null| null|  null|   null|  null|null| null|          null|     null|
|5502|    Thaddeus Bradley|null| null|  null|   null|  null|null| null|          null|     null|
|5503|     Brandon Randall|null| null|  null|   null|  null|null| null|          null|     null|
|5504|       Duncan Valdez|null| null|  null|   null|  null|null| null|          null|     null|
|5505|   Azalia Montgomery|null| null|  null|   null|  null|null| null|          null|     null|
|5506|    Elton Washington|null| null|  null|   null|  null|null| null|          null|     null|
|5507|    Joseph Frederick|null| null|  null|   null|  null|null| null|          null|     null|
|55


# Source with different Data Type

## Read Data with Different DataTypes

In [0]:
file_names = "/raw/{users_011_datatype_mismatch_dob.csv}"
users_dataset_path = f"{users_ext_loc}/{file_names}"
user_df_with_diff_type = spark.read.csv(path=users_dataset_path,
                         header=True,
                         inferSchema=True)
user_df_with_diff_type.printSchema()
user_df_with_diff_type.limit(3).display()

## Write Data with Different Column Type

In [0]:

user_df_with_diff_type.write.format("delta").mode("append").saveAsTable("user_info")

## Solution : Cannot Merge Incompatible  Schema

In [0]:
user_df_with_diff_type.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("user_info")

In [0]:
spark.read.table("user_info").limit(10).display()