# Schema Validation

1. What is Schema-on-read / Schema-on-write? 
2. Column order validation
3. Data Type validation
4. Column name validation
5. Nullability validation
6. Extra columns validation

In [0]:
DROP TABLE deltacatalog.deltadb.invoices_sv;
CREATE OR REPLACE TABLE deltacatalog.deltadb.invoices_sv (
  customer_id INT NOT NULL,  
  invoice_no STRING,
  quantity INT,
  price FLOAT, 
  invoice_date DATE
); 

INSERT INTO deltacatalog.deltadb.invoices_sv
SELECT customer_id, invoice_no, quantity, price, invoice_date
FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_1_100.parquet`; 

num_affected_rows,num_inserted_rows
100,100


In [0]:
SELECT * FROM deltacatalog.deltadb.invoices_sv
LIMIT 5;

customer_id,invoice_no,quantity,price,invoice_date
1,I178410,5,1500.4,2021-11-26
2,I158163,2,1200.34,2023-03-03
3,I262373,3,107.52,2022-12-01
4,I334895,5,26.15,2021-08-15
5,I202043,1,35.84,2021-07-25


In [0]:
SELECT MIN(customer_id), MAX(customer_id), COUNT(*)
FROM deltacatalog.deltadb.invoices_sv;

min(customer_id),max(customer_id),count(1)
1,100,100


#### Scenario 1: Column Order Validation

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
SELECT quantity, invoice_no, customer_id, price, invoice_date
FROM VALUES (99999, 'I12345', 10, 100, '2025-01-01')
AS T(customer_id, invoice_no, quantity, price, invoice_date)

num_affected_rows,num_inserted_rows
1,1


In [0]:
SELECT * FROM deltacatalog.deltadb.invoices_sv
WHERE customer_id = 99999;

customer_id,invoice_no,quantity,price,invoice_date


In [0]:
SELECT * FROM deltacatalog.deltadb.invoices_sv
WHERE customer_id = 10;

customer_id,invoice_no,quantity,price,invoice_date
10,I280920,3,900.24,2021-04-23
10,I12345,99999,100.0,2025-01-01


In [0]:
SELECT customer_id, invoice_no, quantity, price, invoice_date
FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_101_200.parquet`
ORDER BY customer_id DESC LIMIT 5;

customer_id,invoice_no,quantity,price,invoice_date
200,I856190,5,1500.4,2022-05-17
199,I205401,2,600.16,2022-03-21
198,I233680,2,71.68,2021-05-11
197,I865759,1,5.23,2021-12-04
196,I171884,3,1800.51,2021-07-25


In [0]:
MERGE INTO deltacatalog.deltadb.invoices_sv AS tgt
USING (
  SELECT quantity, invoice_no, customer_id, CAST (price AS FLOAT), invoice_date
  FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_101_200.parquet`
  ORDER BY customer_id DESC LIMIT 5
) src 
ON tgt.customer_id = src.customer_id
WHEN MATCHED THEN 
  UPDATE SET 
    tgt.customer_id = src.customer_id,
    tgt.invoice_no = src.invoice_no,
    tgt.quantity = src.quantity,
    tgt.price = src.price,
    tgt.invoice_date = current_date
WHEN NOT MATCHED THEN
  INSERT * 

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
5,0,0,5


In [0]:
SELECT * FROM deltacatalog.deltadb.invoices_sv
WHERE customer_id > 100; 

customer_id,invoice_no,quantity,price,invoice_date
200,I856190,5,1500.4,2022-05-17
199,I205401,2,600.16,2022-03-21
198,I233680,2,71.68,2021-05-11
197,I865759,1,5.23,2021-12-04
196,I171884,3,1800.51,2021-07-25


#### Scenario 2: Data Type Validation

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
VALUES (
  'ABC', 
  'I45678',
  10,
  98.75,
  '2025-01-01'
)

org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'ABC' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22018
== SQL (line 1, position 1) ==
INSERT INTO deltacatalog.deltadb.invoices_sv
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:193)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toIntExact(UTF8StringUtils.scala:34)
	at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToInt$2(Cast.scala:909)
	at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToInt$2$adapted(Cast.scala:909)
	at org.apac

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
VALUES (
  '99499', 
  'I45678',
  10,
  98.75,
  '2025-01-01'
)

num_affected_rows,num_inserted_rows
1,1


#### Scenario 3: Column Name Validation

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
SELECT customer_id AS c_id, invoice_no, quantity AS qty, price, invoice_date
FROM VALUES (99999, 'I12345', 10, 100, '2025-01-01')
AS T(customer_id, invoice_no, quantity, price, invoice_date)

num_affected_rows,num_inserted_rows
1,1


In [0]:
SELECT * FROM deltacatalog.deltadb.invoices_sv
WHERE customer_id = 99999;

customer_id,invoice_no,quantity,price,invoice_date
99999,I12345,10,100.0,2025-01-01


In [0]:
SELECT customer_id, invoice_no, quantity, CAST (price AS FLOAT), invoice_date
FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_101_200.parquet`
ORDER BY customer_id LIMIT 5

customer_id,invoice_no,quantity,price,invoice_date
101,I302068,2,1200.34,2022-07-15
102,I193229,5,1500.4,2021-03-05
103,I313092,2,81.32,2022-04-23
104,I258750,3,15.69,2022-04-04
105,I126182,5,1500.4,2022-02-06


In [0]:
MERGE INTO deltacatalog.deltadb.invoices_sv AS tgt
USING (
  SELECT customer_id AS c_id, invoice_no, quantity AS qty, CAST (price AS FLOAT), invoice_date
  FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_101_200.parquet`
  ORDER BY customer_id LIMIT 5
) src 
ON tgt.customer_id = src.c_id
WHEN MATCHED THEN 
  UPDATE SET 
    tgt.customer_id = src.c_id,
    tgt.invoice_no = src.invoice_no,
    tgt.quantity = src.qty,
    tgt.price = src.price,
    tgt.invoice_date = current_date
WHEN NOT MATCHED THEN
  INSERT * 

com.databricks.sql.transaction.tahoe.DeltaAnalysisException: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve customer_id in INSERT clause given columns src.c_id, src.invoice_no, src.qty, src.price, src.invoice_date.; line 1 pos 0
	at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$4(deltaMerge.scala:446)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.assertResolved$1(deltaMerge.scala:439)
	at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1(deltaMerge.scala:425)
	at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1$adapted(deltaMerge.scala:425)
	at scala.collection.immutable.List.foreach(List.scala:431

In [0]:
SELECT * FROM deltacatalog.deltadb.invoices_sv
WHERE customer_id > 100;

customer_id,invoice_no,quantity,price,invoice_date
200,I856190,5,1500.4,2022-05-17
199,I205401,2,600.16,2022-03-21
198,I233680,2,71.68,2021-05-11
197,I865759,1,5.23,2021-12-04
196,I171884,3,1800.51,2021-07-25
99499,I45678,10,98.75,2025-01-01
99999,I12345,10,100.0,2025-01-01


#### Scenario 4: Nullability Validation

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
VALUES (NULL, NULL, NULL, NULL, NULL);

com.databricks.sql.transaction.tahoe.schema.DeltaInvariantViolationException: [DELTA_NOT_NULL_CONSTRAINT_VIOLATED] NOT NULL constraint violated for column: customer_id.

	at com.databricks.sql.transaction.tahoe.schema.DeltaInvariantViolationException$.getNotNullInvariantViolationException(InvariantViolationException.scala:57)
	at com.databricks.photon.PhotonException$.getSparkException(PhotonException.scala:249)
	at com.databricks.photon.PhotonWriteResultHandler.getResult(PhotonWriteStageExec.scala:138)
	at com.databricks.photon.PhotonBasicEvaluatorFactory$PhotonBasicEvaluator$$anon$1.hasNext(PhotonBasicEvaluatorFactory.scala:207)
	at com.databricks.photon.CloseableIterator$$anon$10.hasNext(CloseableIterator.scala:211)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeUsingPhoton$2(FileFormatWriter.scala:672)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(Executor

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
VALUES (78912, NULL, NULL, NULL, NULL);

num_affected_rows,num_inserted_rows
1,1


#### Scenario 5: Extra Column Validation

In [0]:
INSERT INTO deltacatalog.deltadb.invoices_sv
SELECT customer_id, invoice_no, quantity, price, invoice_date, "VIP" AS customer_type
FROM VALUES (78999, 'I12345', 10, 100, '2025-01-01')
AS T(customer_id, invoice_no, quantity, price, invoice_date)

org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 080db77e-e742-47c2-96dd-5796ca1c0bed).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- customer_id: integer (nullable = false)
-- invoice_no: string (nullable = true)
-- quantity: integer (nullable = true)
-- price: float (nullable = true)
-- invoice_date: date (nullable = true)


Data schema:
root
-- customer_id: integer (nullable = true)
-- invoice_no: string (nullable = true)
-- quantity: integer (nullable = true)
-- price: float (nullable = true)
-- invoice_date: date (nullable = true)
-- customer_type: string (nullable = true)

         
	at com.databricks.sql.transaction.tahoe.MetadataMismatchErrorBuilder.finalize

In [0]:
SELECT customer_id, invoice_no, quantity, price, invoice_date
FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_101_200.parquet`
WHERE customer_id BETWEEN 150 AND 155

customer_id,invoice_no,quantity,price,invoice_date
150,I500960,4,1200.32,2022-12-31
151,I125404,2,71.68,2021-05-27
152,I182701,3,3150.0,2021-02-05
153,I197462,2,1200.34,2022-06-06
154,I154490,5,1500.4,2022-01-24
155,I187525,3,900.24,2022-12-16


In [0]:
MERGE INTO deltacatalog.deltadb.invoices_sv AS tgt
USING (
  SELECT customer_id, invoice_no, quantity, price, invoice_date, "VIP" AS customer_type
  FROM PARQUET.`abfss://labdata@dbdeltalabstorageacct.dfs.core.windows.net/invoices/invoices_101_200.parquet`
  WHERE customer_id BETWEEN 150 AND 155
) src 
ON tgt.customer_id = src.customer_id
WHEN NOT MATCHED THEN
  INSERT * 

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
6,0,0,6


In [0]:
SELECT customer_id, invoice_no, quantity, price, invoice_date
FROM deltacatalog.deltadb.invoices_sv
WHERE customer_id BETWEEN 150 AND 155

customer_id,invoice_no,quantity,price,invoice_date
155,I187525,3,900.24,2022-12-16
152,I182701,3,3150.0,2021-02-05
154,I154490,5,1500.4,2022-01-24
151,I125404,2,71.68,2021-05-27
150,I500960,4,1200.32,2022-12-31
153,I197462,2,1200.34,2022-06-06
